[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-30 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
Thanks @JamesRTaylor for the reviews and suggestions.


---


[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-30 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
@JamesRTaylor - Handled reverse sort; added explanatory comments about 
sorting; added tests for sorting and non-sorting, including EXPLAIN.

Pushed changes; uploaded new .patch file; resubmitted patch.


---


[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-30 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
@JamesRTaylor - Done. I switched the PR to 4.x-HBase-1.4, attached the 
.patch file, and issued "Submit Patch".


---


[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-27 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
@JamesRTaylor -- ok, I will submit a new PR. Can you comment on my last 
push and comments?


---


[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-26 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
Hi @JamesRTaylor, I pushed another change and replied with some comments. 
Please review, thanks.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205297065
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions, sort);
--- End diff --

Same as for previous comment. The GROUP BY cannot produce a reverse sort.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205296815
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
--- End diff --

The GROUP BY cannot cause REV_ROW_KEY_ORDER_BY, because the GROUP BY cannot 
specific or produce descending keys. This is a pre-existing assumption + design 
in SORT_MERGE_JOIN. The SORT_MERGE_JOIN creates its own forward sort, and its 
Tracker reports a forward sort.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205296403
  
--- Diff: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
--- End diff --

The current tests cover when forward sort is required. My comment below 
addresses reverse sort.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205296128
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -183,13 +198,15 @@ public ExplainPlan getExplainPlan() throws 
SQLException {
 if (where != null) {
 planSteps.add("CLIENT FILTER BY " + where.toString());
 }
-if (!groupBy.isEmpty()) {
-if (!groupBy.isOrderPreserving()) {
-planSteps.add("CLIENT SORTED BY " + 
groupBy.getKeyExpressions().toString());
-}
+if (groupBy.isEmpty()) {
+planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
+} else if (groupBy.isOrderPreserving()) {
 planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
+} else if (useHashAgg) {
+planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
--- End diff --

Done.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205206944
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions);
+} else {
+iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+}
 }
-aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getKeyExpressions());
 aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 }
 
--- End diff --

Hi @JamesRTaylor - please review. The sort is now done only when necessary. 
Thanks.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-23 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204495020
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions);
+} else {
+iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+}
 }
-aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getKeyExpressions());
 aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 }
 
--- End diff --

@JamesRTaylor - Done. Please review. Thanks.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204197232
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
--- End diff --

@JamesRTaylor - I tried that as well, as part off trying to avoid this 
sort. That is, I tried catching this special case in ClientAggregatePlan and 
wrapping an OrderedAggregatingResultIterator there. It did not work, which is 
why I relented and implemented this sort this way.

One other detail -- for the special case, this sort is faster than the 
generic sort (it's less general). Bottom line, I have been unable to make the 
patch meet correctness without this sort.


---


[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
@JamesRTaylor - pushed another change per your feedback. Fingers crossed :) 
Thanks.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r20419
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
--- End diff --

I tried avoiding the sort, but could not make the code work without it. In 
my use case, if there's an ORDER BY after the groups, and the ORDER BY was by 
primary key, it was not being applied. If the ORDER BY was not by primary key, 
it was being applied.

I left this sort in because (1) it introduces no error; (2) the sorting is 
after the grouping, so more efficient than the current solution, which sorts 
before the grouping; (3) it avoids the problem above.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204192760
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+  

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204192752
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+  

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204192739
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+  

[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-19 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
@JamesRTaylor - I made the changes and they are ready for review. Thanks.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203926206
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions, 
int thresholdBytes) {
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203926179
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions, 
int thresholdBytes) {
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203926229
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
--- End diff --

Done.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-18 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203467413
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResul

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-17 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203099292
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResul

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-12 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r202112790
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResul

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-10 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r201562225
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
--- End diff --

Made the side effects more explicit.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-10 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r201562154
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
--- End diff --

Done.


---


[GitHub] phoenix issue #308: Client-side hash aggregation

2018-07-10 Thread geraldss
Github user geraldss commented on the issue:

https://github.com/apache/phoenix/pull/308
  
@joshelser Thanks for the review. I can make these changes now and push 
them. Or should I wait for any other reviewer?


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-02 Thread geraldss
GitHub user geraldss opened a pull request:

https://github.com/apache/phoenix/pull/308

Client-side hash aggregation

Client-side hash aggregation for use with sort-merge join.

Implements https://issues.apache.org/jira/browse/PHOENIX-4751


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/geraldss/phoenix master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/phoenix/pull/308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #308


commit c8acc6cb39e222a5206c79566552c5c27cbe27f1
Author: Gerald Sangudi 
Date:   2018-06-14T19:49:30Z

PHOENIX-4751 Add HASH_AGGREGATE hint

commit a261b3f94f753b4a8d6baaad6168e76f97d76bb6
Author: Gerald Sangudi 
Date:   2018-06-16T04:17:32Z

PHOENIX-4751 Begin implementation of client hash aggregation

commit 863d24e34a83282f90d5d2db05522b678dfced74
Author: Rajeshbabu Chintaguntla 
Date:   2018-06-15T22:38:44Z

PHOENIX-4786 Reduce log level to debug when logging new aggregate row key 
found and added results for scan ordered queries(Rajeshbabu)

commit cfae7ddcfa5b58a367cd0c57c23f394ceb9f1259
Author: Gerald Sangudi 
Date:   2018-06-16T04:55:00Z

Merge remote-tracking branch 'upstream/master'

commit 1f453308a24be49a8036292671d51eb25137d680
Author: Gerald Sangudi 
Date:   2018-06-20T17:47:34Z

PHOENIX-4751 Generated aggregated results

commit 66aaacfd989c63e18fb9a5c5b9e133519ab93507
Author: Gerald Sangudi 
Date:   2018-06-24T23:18:14Z

PHOENIX-4751 Sort results of client hash aggregation

commit a6c2b7ce738710cfdffc1e9e4d1d234d2090a225
Author: James Taylor 
Date:   2018-06-18T13:00:02Z

PHOENIX-4789 Exception when setting TTL on Tephra transactional table

commit fba4196fcace83d4e42e902d2cb6295bb519ed39
Author: Ankit Singhal 
Date:   2018-06-21T23:11:02Z

PHOENIX-4785 Unable to write to table if index is made active during retry

commit 05de081b386c502b6c90ff24357ed7dbbc6dedd2
Author: Gerald Sangudi 
Date:   2018-06-29T05:01:55Z

PHOENIX-4751 Add integration test for client hash aggregation

commit b7960d0daedc6ce3c2fbcf0794e4a95639d7ba3c
Author: Gerald Sangudi 
Date:   2018-06-30T00:03:59Z

PHOENIX-4751 Fix and run integration tests for query results

commit a3629ac64b90c117f5caceddbb45fb9dc14649b8
Author: Gerald Sangudi 
Date:   2018-06-30T06:22:43Z

PHOENIX-4751 Add integration test for EXPLAIN

commit 3aa85d5c04309f6e0c5167c002e9dcb6091ea757
Author: Gerald Sangudi 
Date:   2018-06-30T17:13:17Z

PHOENIX-4751 Verify EXPLAIN plan for both salted and unsalted




---