[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 Thanks @JamesRTaylor for the reviews and suggestions. ---
[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 @JamesRTaylor - Handled reverse sort; added explanatory comments about sorting; added tests for sorting and non-sorting, including EXPLAIN. Pushed changes; uploaded new .patch file; resubmitted patch. ---
[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 @JamesRTaylor - Done. I switched the PR to 4.x-HBase-1.4, attached the .patch file, and issued "Submit Patch". ---
[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 @JamesRTaylor -- ok, I will submit a new PR. Can you comment on my last push and comments? ---
[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 Hi @JamesRTaylor, I pushed another change and replied with some comments. Please review, thanks. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r205297065 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java --- @@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators); aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } else { -if (!groupBy.isOrderPreserving()) { -int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( -QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); -List keyExpressions = groupBy.getKeyExpressions(); +List keyExpressions = groupBy.getKeyExpressions(); +if (groupBy.isOrderPreserving()) { +aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); +} else { +int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt +(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); List keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size()); for (Expression keyExpression : keyExpressions) { keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true)); } -iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); + +if (useHashAgg) { +boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY; +aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions, sort); --- End diff -- Same as for previous comment. The GROUP BY cannot produce a reverse sort. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r205296815 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java --- @@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators); aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } else { -if (!groupBy.isOrderPreserving()) { -int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( -QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); -List keyExpressions = groupBy.getKeyExpressions(); +List keyExpressions = groupBy.getKeyExpressions(); +if (groupBy.isOrderPreserving()) { +aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); +} else { +int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt +(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); List keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size()); for (Expression keyExpression : keyExpressions) { keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true)); } -iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); + +if (useHashAgg) { +boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY; --- End diff -- The GROUP BY cannot cause REV_ROW_KEY_ORDER_BY, because the GROUP BY cannot specific or produce descending keys. This is a pre-existing assumption + design in SORT_MERGE_JOIN. The SORT_MERGE_JOIN creates its own forward sort, and its Tracker reports a forward sort. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r205296403 --- Diff: phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; +import org.junit.Test; + +public class ClientHashAggregateIT extends ParallelStatsDisabledIT { --- End diff -- The current tests cover when forward sort is required. My comment below addresses reverse sort. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r205296128 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java --- @@ -183,13 +198,15 @@ public ExplainPlan getExplainPlan() throws SQLException { if (where != null) { planSteps.add("CLIENT FILTER BY " + where.toString()); } -if (!groupBy.isEmpty()) { -if (!groupBy.isOrderPreserving()) { -planSteps.add("CLIENT SORTED BY " + groupBy.getKeyExpressions().toString()); -} +if (groupBy.isEmpty()) { +planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW"); +} else if (groupBy.isOrderPreserving()) { planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString()); +} else if (useHashAgg) { +planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + groupBy.getExpressions().toString()); --- End diff -- Done. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r205206944 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java --- @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators); aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } else { -if (!groupBy.isOrderPreserving()) { -int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( -QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); -List keyExpressions = groupBy.getKeyExpressions(); +List keyExpressions = groupBy.getKeyExpressions(); +if (groupBy.isOrderPreserving()) { +aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); +} else { +int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt +(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); List keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size()); for (Expression keyExpression : keyExpressions) { keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true)); } -iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); + +if (useHashAgg) { +aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions); +} else { +iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); +aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); +} } -aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getKeyExpressions()); aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } --- End diff -- Hi @JamesRTaylor - please review. The sort is now done only when necessary. Thanks. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r204495020 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java --- @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators); aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } else { -if (!groupBy.isOrderPreserving()) { -int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( -QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); -List keyExpressions = groupBy.getKeyExpressions(); +List keyExpressions = groupBy.getKeyExpressions(); +if (groupBy.isOrderPreserving()) { +aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); +} else { +int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt +(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); List keyExpressionOrderBy = Lists.newArrayListWithExpectedSize(keyExpressions.size()); for (Expression keyExpression : keyExpressions) { keyExpressionOrderBy.add(new OrderByExpression(keyExpression, false, true)); } -iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); + +if (useHashAgg) { +aggResultIterator = new ClientHashAggregatingResultIterator(context, iterator, serverAggregators, keyExpressions); +} else { +iterator = new OrderedResultIterator(iterator, keyExpressionOrderBy, thresholdBytes, null, null, projector.getEstimatedRowByteSize()); +aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); +} } -aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, groupBy.getKeyExpressions()); aggResultIterator = new GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } --- End diff -- @JamesRTaylor - Done. Please review. Thanks. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r204197232 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.SizedUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private final int thresholdBytes; +private final MemoryChunk memoryChunk; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators, + List groupByExpressions, int thresholdBytes) { + +Objects.requireNonNull(resultIterator); +Objects.requireNonNull(aggregators); +Objects.requireNonNull(groupByExpressions); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +this.thresholdBytes = thresholdBytes; +memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE); +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +hash = populateHash(); +keyList = sortKeys(); --- End diff -- @JamesRTaylor - I tried that as well, as part off trying to avoid this sort. That is, I tried catching this special case in ClientAggregatePlan and wrapping an OrderedAggregatingResultIterator there. It did not work, which is why I relented and implemented this sort this way. One other detail -- for the special case, this sort is faster than the generic sort (it's less general). Bottom line, I have been unable to make the patch meet correctness without this sort. ---
[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 @JamesRTaylor - pushed another change per your feedback. Fingers crossed :) Thanks. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r20419 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.SizedUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private final int thresholdBytes; +private final MemoryChunk memoryChunk; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators, + List groupByExpressions, int thresholdBytes) { + +Objects.requireNonNull(resultIterator); +Objects.requireNonNull(aggregators); +Objects.requireNonNull(groupByExpressions); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +this.thresholdBytes = thresholdBytes; +memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE); +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +hash = populateHash(); +keyList = sortKeys(); --- End diff -- I tried avoiding the sort, but could not make the code work without it. In my use case, if there's an ORDER BY after the groups, and the ORDER BY was by primary key, it was not being applied. If the ORDER BY was not by primary key, it was being applied. I left this sort in because (1) it introduces no error; (2) the sorting is after the grouping, so more efficient than the current solution, which sorts before the grouping; (3) it avoids the problem above. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r204192760 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.SizedUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private final int thresholdBytes; +private final MemoryChunk memoryChunk; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators, + List groupByExpressions, int thresholdBytes) { + +Objects.requireNonNull(resultIterator); +Objects.requireNonNull(aggregators); +Objects.requireNonNull(groupByExpressions); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +this.thresholdBytes = thresholdBytes; +memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE); +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +hash = populateHash(); +keyList = sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r204192752 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.SizedUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private final int thresholdBytes; +private final MemoryChunk memoryChunk; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators, + List groupByExpressions, int thresholdBytes) { + +Objects.requireNonNull(resultIterator); +Objects.requireNonNull(aggregators); +Objects.requireNonNull(groupByExpressions); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +this.thresholdBytes = thresholdBytes; +memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE); +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +hash = populateHash(); +keyList = sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r204192739 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.memory.MemoryManager.MemoryChunk; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.SizedUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private final int thresholdBytes; +private final MemoryChunk memoryChunk; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(StatementContext context, ResultIterator resultIterator, Aggregators aggregators, + List groupByExpressions, int thresholdBytes) { + +Objects.requireNonNull(resultIterator); +Objects.requireNonNull(aggregators); +Objects.requireNonNull(groupByExpressions); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +this.thresholdBytes = thresholdBytes; +memoryChunk = context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE); +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +hash = populateHash(); +keyList = sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +
[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 @JamesRTaylor - I made the changes and they are ready for review. Thanks. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r203926206 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private final int thresholdBytes; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List groupByExpressions, int thresholdBytes) { +Objects.requireNonNull(resultIterator); +Objects.requireNonNull(aggregators); +Objects.requireNonNull(groupByExpressions); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +this.thresholdBytes = thresholdBytes; +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +hash = populateHash(); +keyList = sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +resultIterator.close(); +} + +@Override +public Aggregator[] aggregate(Tuple result) { +Aggregator[] rowAggregators = aggregators.getAggregators(); +aggregators.reset(rowAggregators); +aggregators.aggregate(rowAggregators, result); +return rowAggregators; +} + +@Override +public void explain(List planSteps) { +resultIterator.explain(planSteps); +} + +@Override +
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r203926179 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private final int thresholdBytes; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List groupByExpressions, int thresholdBytes) { +Objects.requireNonNull(resultIterator); +Objects.requireNonNull(aggregators); +Objects.requireNonNull(groupByExpressions); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +this.thresholdBytes = thresholdBytes; +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +hash = populateHash(); +keyList = sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +resultIterator.close(); +} + +@Override +public Aggregator[] aggregate(Tuple result) { +Aggregator[] rowAggregators = aggregators.getAggregators(); +aggregators.reset(rowAggregators); +aggregators.aggregate(rowAggregators, result); +return rowAggregators; +} + +@Override +public void explain(List planSteps) { +resultIterator.explain(planSteps); +} + +@Override +
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r203926229 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java --- @@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) throw aggResultIterator = new ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators); aggResultIterator = new UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator), clientAggregators); } else { -if (!groupBy.isOrderPreserving()) { -int thresholdBytes = context.getConnection().getQueryServices().getProps().getInt( -QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES); -List keyExpressions = groupBy.getKeyExpressions(); +List keyExpressions = groupBy.getKeyExpressions(); +if (groupBy.isOrderPreserving()) { +aggResultIterator = new ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), serverAggregators, keyExpressions); --- End diff -- Done. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r203467413 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List groupByExpressions) { +if (resultIterator == null) throw new NullPointerException(); +if (aggregators == null) throw new NullPointerException(); +if (groupByExpressions == null) throw new NullPointerException(); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +populateHash(); +sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +resultIterator.close(); +} + +@Override +public Aggregator[] aggregate(Tuple result) { +Aggregator[] rowAggregators = aggregators.getAggregators(); +aggregators.reset(rowAggregators); +aggregators.aggregate(rowAggregators, result); +return rowAggregators; +} + +@Override +public void explain(List planSteps) { +resultIterator.explain(planSteps); +} + +@Override +public String toString() { +return "ClientHashAggregatingResul
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r203099292 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List groupByExpressions) { +if (resultIterator == null) throw new NullPointerException(); +if (aggregators == null) throw new NullPointerException(); +if (groupByExpressions == null) throw new NullPointerException(); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +populateHash(); +sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +resultIterator.close(); +} + +@Override +public Aggregator[] aggregate(Tuple result) { +Aggregator[] rowAggregators = aggregators.getAggregators(); +aggregators.reset(rowAggregators); +aggregators.aggregate(rowAggregators, result); +return rowAggregators; +} + +@Override +public void explain(List planSteps) { +resultIterator.explain(planSteps); +} + +@Override +public String toString() { +return "ClientHashAggregatingResul
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r202112790 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List groupByExpressions) { +if (resultIterator == null) throw new NullPointerException(); +if (aggregators == null) throw new NullPointerException(); +if (groupByExpressions == null) throw new NullPointerException(); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +populateHash(); +sortKeys(); +keyIterator = keyList.iterator(); +} + +if (!keyIterator.hasNext()) { +return null; +} + +ImmutableBytesWritable key = keyIterator.next(); +Aggregator[] rowAggregators = hash.get(key); +byte[] value = aggregators.toBytes(rowAggregators); +Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length)); +return tuple; +} + +@Override +public void close() throws SQLException { +keyIterator = null; +keyList = null; +hash = null; +resultIterator.close(); +} + +@Override +public Aggregator[] aggregate(Tuple result) { +Aggregator[] rowAggregators = aggregators.getAggregators(); +aggregators.reset(rowAggregators); +aggregators.aggregate(rowAggregators, result); +return rowAggregators; +} + +@Override +public void explain(List planSteps) { +resultIterator.explain(planSteps); +} + +@Override +public String toString() { +return "ClientHashAggregatingResul
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r201562225 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List groupByExpressions) { +if (resultIterator == null) throw new NullPointerException(); +if (aggregators == null) throw new NullPointerException(); +if (groupByExpressions == null) throw new NullPointerException(); +this.resultIterator = resultIterator; +this.aggregators = aggregators; +this.groupByExpressions = groupByExpressions; +} + +@Override +public Tuple next() throws SQLException { +if (keyIterator == null) { +populateHash(); --- End diff -- Made the side effects more explicit. ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
Github user geraldss commented on a diff in the pull request: https://github.com/apache/phoenix/pull/308#discussion_r201562154 --- Diff: phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; +import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.expression.Expression; +import org.apache.phoenix.expression.aggregator.Aggregator; +import org.apache.phoenix.expression.aggregator.Aggregators; +import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; +import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.KeyValueUtil; +import org.apache.phoenix.util.TupleUtil; + +/** + * + * This class implements client-side hash aggregation in memory. + * Issue https://issues.apache.org/jira/browse/PHOENIX-4751. + * + */ +public class ClientHashAggregatingResultIterator +implements AggregatingResultIterator { + +private static final int HASH_AGG_INIT_SIZE = 64*1024; +private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0]; +private final ResultIterator resultIterator; +private final Aggregators aggregators; +private final List groupByExpressions; +private HashMap hash; +private List keyList; +private Iterator keyIterator; + +public ClientHashAggregatingResultIterator(ResultIterator resultIterator, Aggregators aggregators, List groupByExpressions) { +if (resultIterator == null) throw new NullPointerException(); --- End diff -- Done. ---
[GitHub] phoenix issue #308: Client-side hash aggregation
Github user geraldss commented on the issue: https://github.com/apache/phoenix/pull/308 @joshelser Thanks for the review. I can make these changes now and push them. Or should I wait for any other reviewer? ---
[GitHub] phoenix pull request #308: Client-side hash aggregation
GitHub user geraldss opened a pull request: https://github.com/apache/phoenix/pull/308 Client-side hash aggregation Client-side hash aggregation for use with sort-merge join. Implements https://issues.apache.org/jira/browse/PHOENIX-4751 You can merge this pull request into a Git repository by running: $ git pull https://github.com/geraldss/phoenix master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/phoenix/pull/308.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #308 commit c8acc6cb39e222a5206c79566552c5c27cbe27f1 Author: Gerald Sangudi Date: 2018-06-14T19:49:30Z PHOENIX-4751 Add HASH_AGGREGATE hint commit a261b3f94f753b4a8d6baaad6168e76f97d76bb6 Author: Gerald Sangudi Date: 2018-06-16T04:17:32Z PHOENIX-4751 Begin implementation of client hash aggregation commit 863d24e34a83282f90d5d2db05522b678dfced74 Author: Rajeshbabu Chintaguntla Date: 2018-06-15T22:38:44Z PHOENIX-4786 Reduce log level to debug when logging new aggregate row key found and added results for scan ordered queries(Rajeshbabu) commit cfae7ddcfa5b58a367cd0c57c23f394ceb9f1259 Author: Gerald Sangudi Date: 2018-06-16T04:55:00Z Merge remote-tracking branch 'upstream/master' commit 1f453308a24be49a8036292671d51eb25137d680 Author: Gerald Sangudi Date: 2018-06-20T17:47:34Z PHOENIX-4751 Generated aggregated results commit 66aaacfd989c63e18fb9a5c5b9e133519ab93507 Author: Gerald Sangudi Date: 2018-06-24T23:18:14Z PHOENIX-4751 Sort results of client hash aggregation commit a6c2b7ce738710cfdffc1e9e4d1d234d2090a225 Author: James Taylor Date: 2018-06-18T13:00:02Z PHOENIX-4789 Exception when setting TTL on Tephra transactional table commit fba4196fcace83d4e42e902d2cb6295bb519ed39 Author: Ankit Singhal Date: 2018-06-21T23:11:02Z PHOENIX-4785 Unable to write to table if index is made active during retry commit 05de081b386c502b6c90ff24357ed7dbbc6dedd2 Author: Gerald Sangudi Date: 2018-06-29T05:01:55Z PHOENIX-4751 Add integration test for client hash aggregation commit b7960d0daedc6ce3c2fbcf0794e4a95639d7ba3c Author: Gerald Sangudi Date: 2018-06-30T00:03:59Z PHOENIX-4751 Fix and run integration tests for query results commit a3629ac64b90c117f5caceddbb45fb9dc14649b8 Author: Gerald Sangudi Date: 2018-06-30T06:22:43Z PHOENIX-4751 Add integration test for EXPLAIN commit 3aa85d5c04309f6e0c5167c002e9dcb6091ea757 Author: Gerald Sangudi Date: 2018-06-30T17:13:17Z PHOENIX-4751 Verify EXPLAIN plan for both salted and unsalted ---