[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-30 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r206295564
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions, sort);
--- End diff --

Please add comment about reverse sort and point to JIRA (create one if 
there's not already one). Might be best to just have the code here such that 
once reverse sort is supported, the hash aggregate will just work.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-30 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r206295347
  
--- Diff: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
--- End diff --

Unless I'm missing something, this needs a few more tests:
* verify CLIENT SORTED BY is present in explain plan when sort required for 
hash aggregate
* verify it's not in explain plan when sort not required
* verify query results when sort not required


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205297065
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions, sort);
--- End diff --

Same as for previous comment. The GROUP BY cannot produce a reverse sort.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205296815
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
--- End diff --

The GROUP BY cannot cause REV_ROW_KEY_ORDER_BY, because the GROUP BY cannot 
specific or produce descending keys. This is a pre-existing assumption + design 
in SORT_MERGE_JOIN. The SORT_MERGE_JOIN creates its own forward sort, and its 
Tracker reports a forward sort.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205296403
  
--- Diff: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
--- End diff --

The current tests cover when forward sort is required. My comment below 
addresses reverse sort.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205296128
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -183,13 +198,15 @@ public ExplainPlan getExplainPlan() throws 
SQLException {
 if (where != null) {
 planSteps.add("CLIENT FILTER BY " + where.toString());
 }
-if (!groupBy.isEmpty()) {
-if (!groupBy.isOrderPreserving()) {
-planSteps.add("CLIENT SORTED BY " + 
groupBy.getKeyExpressions().toString());
-}
+if (groupBy.isEmpty()) {
+planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
+} else if (groupBy.isOrderPreserving()) {
 planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
+} else if (useHashAgg) {
+planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
--- End diff --

Done.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205232207
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -183,13 +198,15 @@ public ExplainPlan getExplainPlan() throws 
SQLException {
 if (where != null) {
 planSteps.add("CLIENT FILTER BY " + where.toString());
 }
-if (!groupBy.isEmpty()) {
-if (!groupBy.isOrderPreserving()) {
-planSteps.add("CLIENT SORTED BY " + 
groupBy.getKeyExpressions().toString());
-}
+if (groupBy.isEmpty()) {
+planSteps.add("CLIENT AGGREGATE INTO SINGLE ROW");
+} else if (groupBy.isOrderPreserving()) {
 planSteps.add("CLIENT AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
+} else if (useHashAgg) {
+planSteps.add("CLIENT HASH AGGREGATE INTO DISTINCT ROWS BY " + 
groupBy.getExpressions().toString());
--- End diff --

Add  CLIENT SORTED BY line here if sorting required for hash aggregate.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205229353
  
--- Diff: 
phoenix-core/src/it/java/org/apache/phoenix/end2end/ClientHashAggregateIT.java 
---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Properties;
+
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Test;
+
+public class ClientHashAggregateIT extends ParallelStatsDisabledIT {
--- End diff --

Add tests for when sort (forward & reverse) required for hash aggregate.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205231380
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions, sort);
--- End diff --

You’ll need to pass through if forward or reverse scan. You might just 
pass through orderBy instead of the boolean. Better still would be to let the 
code below insert an Ordering result iterator so you wouldn’t  need the sort 
logic at all in your new iterator.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205230186
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,25 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+boolean sort = orderBy == OrderBy.FWD_ROW_KEY_ORDER_BY;
--- End diff --

Should be true if OrderBy.RVS_ROW_KEY_ORDER_BY too.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-25 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r205206944
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions);
+} else {
+iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+}
 }
-aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getKeyExpressions());
 aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 }
 
--- End diff --

Hi @JamesRTaylor - please review. The sort is now done only when necessary. 
Thanks.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-23 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204495020
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions);
+} else {
+iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+}
 }
-aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getKeyExpressions());
 aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 }
 
--- End diff --

@JamesRTaylor - Done. Please review. Thanks.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204197328
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+} else {
+int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
+(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
 List keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
 for (Expression keyExpression : keyExpressions) {
 keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
 }
-iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+
+if (useHashAgg) {
+aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions);
+} else {
+iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
+}
 }
-aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, groupBy.getKeyExpressions());
 aggResultIterator = new 
GroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 }
 
--- End diff --

In the below if statement, you should still insert the 
OrderedAggregatingResultIterator if a hash aggregation is being done and 
orderBy != OrderBy.EMPTY_ORDER_BY:

// Still sort the aggregated rows if we're using a hash aggregation and 
the order by was optimized out
// since the rows won't be in GROUP BY key order
if (orderBy.getOrderByExpressions().isEmpty() && (!useHashAgg || 
orderBy != OrderBy.EMPTY_ORDER_BY)) {


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204197232
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
--- End diff --

@JamesRTaylor - I tried that as well, as part off trying to avoid this 
sort. That is, I tried catching this special case in ClientAggregatePlan and 
wrapping an OrderedAggregatingResultIterator there. It did not work, which is 
why I relented and implemented this sort this way.

One other detail -- for the special case, this sort is faster than the 
generic sort (it's less general). Bottom line, I have been unable to make the 
patch meet correctness without this sort.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204196968
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
--- End diff --

See my comment in ClientAggregatePlan.java. I believe you can detect this 
corner case there and not need this sort. It's a little weird to have a hash 
aggregation that still does a sort (but I get your point about it being better 
than doing the sort before).


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r20419
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
--- End diff --

I tried avoiding the sort, but could not make the code work without it. In 
my use case, if there's an ORDER BY after the groups, and the ORDER BY was by 
primary key, it was not being applied. If the ORDER BY was not by primary key, 
it was being applied.

I left this sort in because (1) it introduces no error; (2) the sorting is 
after the grouping, so more efficient than the current solution, which sorts 
before the grouping; (3) it avoids the problem above.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204192760
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204192752
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204192739
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204167291
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204161104
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204176238
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
--- End diff --

I don't think this sort is required at all. You've already guaranteed that 
each row has a unique row key, so the subsequent 
GroupedAggregatingResultIterator should work fine (that expects duplicate rows 
to be adjacent and since every row is unique, that'll be the case). If there's 
an ORDER BY for the groups, the Phoenix will insert an ordering result iterator.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-20 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r204168576
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.SizedUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final int CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private final MemoryChunk memoryChunk;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(StatementContext context, 
ResultIterator resultIterator, Aggregators aggregators,
+   List 
groupByExpressions, int thresholdBytes) {
+
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+memoryChunk = 
context.getConnection().getQueryServices().getMemoryManager().allocate(CLIENT_HASH_AGG_MEMORY_CHUNK_SIZE);
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203926206
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions, 
int thresholdBytes) {
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203926179
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions, 
int thresholdBytes) {
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203926229
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
--- End diff --

Done.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203867850
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions, 
int thresholdBytes) {
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203882988
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashSizeException.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.iterate;
+
+/**
+ * Thrown by {@link 
org.apache.phoenix.iterate.ClientHashAggregatingResultIterator } when
+ * hash size exceeds memory threshold.
+ * 
+ */
+public class ClientHashSizeException extends RuntimeException {
--- End diff --

You won't need this as an InsufficientMemoryException will be thrown if you 
go above the specified memory limit (based on existing Phoenix config 
properties) and this will be unwound to become a SQLException with the code 
SQLExceptionCode.INSUFFICIENT_MEMORY.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203868079
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private final int thresholdBytes;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions, 
int thresholdBytes) {
+Objects.requireNonNull(resultIterator);
+Objects.requireNonNull(aggregators);
+Objects.requireNonNull(groupByExpressions);
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+this.thresholdBytes = thresholdBytes;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+hash = populateHash();
+keyList = sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-19 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203879864
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java 
---
@@ -135,17 +142,24 @@ public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throw
 aggResultIterator = new 
ClientUngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator),
 serverAggregators);
 aggResultIterator = new 
UngroupedAggregatingResultIterator(LookAheadResultIterator.wrap(aggResultIterator),
 clientAggregators);
 } else {
-if (!groupBy.isOrderPreserving()) {
-int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-List keyExpressions = 
groupBy.getKeyExpressions();
+List keyExpressions = groupBy.getKeyExpressions();
+if (groupBy.isOrderPreserving()) {
+aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
--- End diff --

Pass through context here too to ClientGroupedAggregatingResultIterator as 
you'll need it to get the memory manager. 


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-18 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203467413
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResultIterator [resultIterator=" 
++ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-17 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r203099292
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResultIterator [resultIterator=" 
++ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-16 Thread JamesRTaylor
Github user JamesRTaylor commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r202842452
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResultIterator [resultIterator=" 
++ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-16 Thread solzy
Github user solzy commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r202593990
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResultIterator [resultIterator=" 
++ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-12 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r202112790
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResultIterator [resultIterator=" 
++ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-12 Thread solzy
Github user solzy commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r202024941
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
+sortKeys();
+keyIterator = keyList.iterator();
+}
+
+if (!keyIterator.hasNext()) {
+return null;
+}
+
+ImmutableBytesWritable key = keyIterator.next();
+Aggregator[] rowAggregators = hash.get(key);
+byte[] value = aggregators.toBytes(rowAggregators);
+Tuple tuple = wrapKeyValueAsResult(KeyValueUtil.newKeyValue(key, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length));
+return tuple;
+}
+
+@Override
+public void close() throws SQLException {
+keyIterator = null;
+keyList = null;
+hash = null;
+resultIterator.close();
+}
+
+@Override
+public Aggregator[] aggregate(Tuple result) {
+Aggregator[] rowAggregators = aggregators.getAggregators();
+aggregators.reset(rowAggregators);
+aggregators.aggregate(rowAggregators, result);
+return rowAggregators;
+}
+
+@Override
+public void explain(List planSteps) {
+resultIterator.explain(planSteps);
+}
+
+@Override
+public String toString() {
+return "ClientHashAggregatingResultIterator [resultIterator=" 
++ 

[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-10 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r201562225
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
--- End diff --

Made the side effects more explicit.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-10 Thread geraldss
Github user geraldss commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r201562154
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
--- End diff --

Done.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-10 Thread joshelser
Github user joshelser commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r201468535
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
+if (aggregators == null) throw new NullPointerException();
+if (groupByExpressions == null) throw new NullPointerException();
+this.resultIterator = resultIterator;
+this.aggregators = aggregators;
+this.groupByExpressions = groupByExpressions;
+}
+
+@Override
+public Tuple next() throws SQLException {
+if (keyIterator == null) {
+populateHash();
--- End diff --

A comment here would be nice to note the side-effect that `hash` and 
`keyList` are guaranteed to be non-null (and thus, the lack of the defensive 
null-checks below is OK).


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-10 Thread joshelser
Github user joshelser commented on a diff in the pull request:

https://github.com/apache/phoenix/pull/308#discussion_r201465583
  
--- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/iterate/ClientHashAggregatingResultIterator.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
+import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.Aggregators;
+import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.KeyValueUtil;
+import org.apache.phoenix.util.TupleUtil;
+
+/**
+ * 
+ * This class implements client-side hash aggregation in memory.
+ * Issue https://issues.apache.org/jira/browse/PHOENIX-4751.
+ * 
+ */
+public class ClientHashAggregatingResultIterator
+implements AggregatingResultIterator {
+
+private static final int HASH_AGG_INIT_SIZE = 64*1024;
+private static final byte[] UNITIALIZED_KEY_BUFFER = new byte[0];
+private final ResultIterator resultIterator;
+private final Aggregators aggregators;
+private final List groupByExpressions;
+private HashMap hash;
+private List keyList;
+private Iterator keyIterator;
+
+public ClientHashAggregatingResultIterator(ResultIterator 
resultIterator, Aggregators aggregators, List groupByExpressions) {
+if (resultIterator == null) throw new NullPointerException();
--- End diff --

Could consolidate this down into `Objects.requireNonNull(resultIterator)` 
from java.util.Objects.


---


[GitHub] phoenix pull request #308: Client-side hash aggregation

2018-07-02 Thread geraldss
GitHub user geraldss opened a pull request:

https://github.com/apache/phoenix/pull/308

Client-side hash aggregation

Client-side hash aggregation for use with sort-merge join.

Implements https://issues.apache.org/jira/browse/PHOENIX-4751


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/geraldss/phoenix master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/phoenix/pull/308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #308


commit c8acc6cb39e222a5206c79566552c5c27cbe27f1
Author: Gerald Sangudi 
Date:   2018-06-14T19:49:30Z

PHOENIX-4751 Add HASH_AGGREGATE hint

commit a261b3f94f753b4a8d6baaad6168e76f97d76bb6
Author: Gerald Sangudi 
Date:   2018-06-16T04:17:32Z

PHOENIX-4751 Begin implementation of client hash aggregation

commit 863d24e34a83282f90d5d2db05522b678dfced74
Author: Rajeshbabu Chintaguntla 
Date:   2018-06-15T22:38:44Z

PHOENIX-4786 Reduce log level to debug when logging new aggregate row key 
found and added results for scan ordered queries(Rajeshbabu)

commit cfae7ddcfa5b58a367cd0c57c23f394ceb9f1259
Author: Gerald Sangudi 
Date:   2018-06-16T04:55:00Z

Merge remote-tracking branch 'upstream/master'

commit 1f453308a24be49a8036292671d51eb25137d680
Author: Gerald Sangudi 
Date:   2018-06-20T17:47:34Z

PHOENIX-4751 Generated aggregated results

commit 66aaacfd989c63e18fb9a5c5b9e133519ab93507
Author: Gerald Sangudi 
Date:   2018-06-24T23:18:14Z

PHOENIX-4751 Sort results of client hash aggregation

commit a6c2b7ce738710cfdffc1e9e4d1d234d2090a225
Author: James Taylor 
Date:   2018-06-18T13:00:02Z

PHOENIX-4789 Exception when setting TTL on Tephra transactional table

commit fba4196fcace83d4e42e902d2cb6295bb519ed39
Author: Ankit Singhal 
Date:   2018-06-21T23:11:02Z

PHOENIX-4785 Unable to write to table if index is made active during retry

commit 05de081b386c502b6c90ff24357ed7dbbc6dedd2
Author: Gerald Sangudi 
Date:   2018-06-29T05:01:55Z

PHOENIX-4751 Add integration test for client hash aggregation

commit b7960d0daedc6ce3c2fbcf0794e4a95639d7ba3c
Author: Gerald Sangudi 
Date:   2018-06-30T00:03:59Z

PHOENIX-4751 Fix and run integration tests for query results

commit a3629ac64b90c117f5caceddbb45fb9dc14649b8
Author: Gerald Sangudi 
Date:   2018-06-30T06:22:43Z

PHOENIX-4751 Add integration test for EXPLAIN

commit 3aa85d5c04309f6e0c5167c002e9dcb6091ea757
Author: Gerald Sangudi 
Date:   2018-06-30T17:13:17Z

PHOENIX-4751 Verify EXPLAIN plan for both salted and unsalted




---