[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-10-08 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r332620855
 
 

 ##
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/Record.java
 ##
 @@ -43,4 +46,9 @@ public Key getKey() {
   public Object[] getValues() {
 return _values;
   }
+
+  @Override
+  public String toString() {
+return _key.toString() + " " + Arrays.deepToString(_values);
 
 Review comment:
   Will this String be used for de-serialization? And can key have whitespaces?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-10-08 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r332618635
 
 

 ##
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 ##
 @@ -19,37 +19,68 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nonnull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.order.OrderByUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Thread safe {@link Table} implementation for aggregating TableRecords based 
on combination of keys
  */
 public class ConcurrentIndexedTable extends IndexedTable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentIndexedTable.class);
+
   private ConcurrentMap _lookupMap;
-  private Comparator _minHeapComparator;
   private ReentrantReadWriteLock _readWriteLock;
 
+  private boolean _isOrderBy;
+  private Comparator _orderByComparator;
+  private Comparator _finalOrderByComparator;
+  private List _aggregationIndexes;
+
+  private AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+  private LongAdder _numResizes = new LongAdder();
+  private LongAccumulator _resizeTime = new LongAccumulator(Long::sum, 0);
+
+  /**
+   * Initializes the data structures and comparators needed for this Table
+   * @param dataSchema data schema of the record's keys and values
+   * @param aggregationInfos aggregation infors for the aggregations in 
record'd values
+   * @param orderBy list of {@link SelectionSort} defining the order by
+   * @param maxCapacity the max number of records to hold
+   * @param sort does final result need to be sorted
+   */
   @Override
   public void init(@Nonnull DataSchema dataSchema, List 
aggregationInfos, List orderBy,
-  int maxCapacity) {
-super.init(dataSchema, aggregationInfos, orderBy, maxCapacity);
+  int maxCapacity, boolean sort) {
+super.init(dataSchema, aggregationInfos, orderBy, maxCapacity, sort);
 
-_minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, 
orderBy, aggregationInfos).reversed();
 _lookupMap = new ConcurrentHashMap<>();
 _readWriteLock = new ReentrantReadWriteLock();
+_isOrderBy = CollectionUtils.isNotEmpty(orderBy);
+if (_isOrderBy) {
+  _orderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, 
orderBy, aggregationInfos, false);
 
 Review comment:
   May be add a TODO here in the code as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-09-18 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325800046
 
 

 ##
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
 ##
 @@ -19,37 +19,65 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAccumulator;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nonnull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.order.OrderByUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Thread safe {@link Table} implementation for aggregating TableRecords based 
on combination of keys
  */
 public class ConcurrentIndexedTable extends IndexedTable {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ConcurrentIndexedTable.class);
+
   private ConcurrentMap _lookupMap;
-  private Comparator _minHeapComparator;
   private ReentrantReadWriteLock _readWriteLock;
 
+  private boolean _isOrderBy;
+  private Comparator _minHeapComparator;
+  private Comparator _orderByComparator;
+
+  private AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+  private LongAdder _numResizes = new LongAdder();
+  private LongAccumulator _resizeTime = new LongAccumulator(Long::sum, 0);
+
+  /**
+   * Initializes the data structures and comparators needed for this Table
+   * @param dataSchema data schema of the record's keys and values
+   * @param aggregationInfos aggregation infors for the aggregations in 
record'd values
+   * @param orderBy list of {@link SelectionSort} defining the order by
+   * @param maxCapacity the max number of records to hold
+   * @param sort does final result need to be sorted
+   */
   @Override
   public void init(@Nonnull DataSchema dataSchema, List 
aggregationInfos, List orderBy,
-  int maxCapacity) {
-super.init(dataSchema, aggregationInfos, orderBy, maxCapacity);
+  int maxCapacity, boolean sort) {
+super.init(dataSchema, aggregationInfos, orderBy, maxCapacity, sort);
 
-_minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, 
orderBy, aggregationInfos).reversed();
 _lookupMap = new ConcurrentHashMap<>();
 _readWriteLock = new ReentrantReadWriteLock();
+_isOrderBy = CollectionUtils.isNotEmpty(orderBy);
+if (_isOrderBy) {
+  _minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, 
orderBy, aggregationInfos).reversed();
 
 Review comment:
   Will this support order-by with ASC/DESC?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-09-18 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325797789
 
 

 ##
 File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
 ##
 @@ -171,9 +171,12 @@ public ServerType getServerType() {
   public static final String SQL = "sql";
   public static final String TRACE = "trace";
   public static final String DEBUG_OPTIONS = "debugOptions";
+  public static final String QUERY_OPTIONS = "queryOptions";
 
 Review comment:
   I don't recall using it for partitioning. But seems this got introduced in: 
https://github.com/apache/incubator-pinot/pull/1490
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-09-18 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325807516
 
 

 ##
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 ##
 @@ -392,6 +434,218 @@ private void setAggregationResults(@Nonnull 
BrokerResponseNative brokerResponseN
 brokerResponseNative.setAggregationResults(reducedAggregationResults);
   }
 
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTableMap map from server to data table
+   */
+  private void setSQLGroupByOrderByResults(@Nonnull BrokerResponseNative 
brokerResponseNative,
+  @Nonnull DataSchema dataSchema, @Nonnull List 
aggregationInfos, @Nonnull GroupBy groupBy,
+  @Nonnull List orderBy, @Nonnull Map dataTableMap,
+  boolean preserveType) {
+
+List columns = new ArrayList<>(dataSchema.size());
+for (int i = 0; i < dataSchema.size(); i++) {
+  columns.add(dataSchema.getColumnName(i));
+}
+
+int numGroupBy = groupBy.getExpressionsSize();
+int numAggregations = aggregationInfos.size();
+
+IndexedTable indexedTable;
+try {
+  indexedTable =
+  getIndexedTable(numGroupBy, numAggregations, groupBy, 
aggregationInfos, orderBy, dataSchema, dataTableMap);
+} catch (Throwable throwable) {
+  throw new IllegalStateException(throwable);
+}
+
+List aggregationFunctions = new 
ArrayList<>(aggregationInfos.size());
+for (AggregationInfo aggregationInfo : aggregationInfos) {
+  aggregationFunctions.add(
+  
AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
+}
+
+List rows = new ArrayList<>();
+int numColumns = columns.size();
+Iterator sortedIterator = indexedTable.iterator();
+int numRows = 0;
+while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
+
+  Record nextRecord = sortedIterator.next();
+  Serializable[] row = new Serializable[numColumns];
+  int index = 0;
+  for (Object keyColumn : nextRecord.getKey().getColumns()) {
+row[index ++] = getSerializableValue(keyColumn);
+  }
+  int aggNum = 0;
+  for (Object valueColumn : nextRecord.getValues()) {
+row[index] = 
getSerializableValue(aggregationFunctions.get(aggNum).extractFinalResult(valueColumn));
+if (preserveType) {
+  row[index] = AggregationFunctionUtils.formatValue(row[index]);
+}
+index ++;
+  }
+  rows.add(row);
+  numRows++;
+}
+
+brokerResponseNative.setResultTable(new ResultTable(columns, rows));
+  }
+
+  private IndexedTable getIndexedTable(int numGroupBy, int numAggregations, 
GroupBy groupBy,
+  List aggregationInfos, List orderBy, 
DataSchema dataSchema, Map dataTableMap)
+  throws Throwable {
+
+IndexedTable indexedTable = new ConcurrentIndexedTable();
+// setting a higher value to avoid frequent resizing
+int capacity = (int) Math.max(groupBy.getTopN(), 1);
+indexedTable.init(dataSchema, aggregationInfos, orderBy, capacity, true);
+
+for (DataTable dataTable : dataTableMap.values()) {
+  CheckedFunction2[] functions = new CheckedFunction2[dataSchema.size()];
+  for (int i = 0; i < dataSchema.size(); i++) {
+ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+CheckedFunction2 function;
+switch (columnDataType) {
+
+  case INT:
+function = (CheckedFunction2) 
dataTable::getInt;
+break;
+  case LONG:
+function = (CheckedFunction2) 
dataTable::getLong;
+break;
+  case FLOAT:
+function = (CheckedFunction2) 
dataTable::getFloat;
+break;
+  case DOUBLE:
+function = (CheckedFunction2) 
dataTable::getDouble;
+break;
+  case STRING:
+function = (CheckedFunction2) 
dataTable::getString;
+break;
+  default:
+function = (CheckedFunction2) 
dataTable::getObject;
+}
+functions[i] = function;
+  }
+
+  for (int row = 0; row < dataTable.getNumberOfRows(); row++) {
+Object[] key = new Object[numGroupBy];
+int col = 0;
+for (int j = 0; j < numGroupBy; j++) {
+  key[j] = functions[col].apply(row, col);
+  col ++;
+}
+Object[] value = new Object[numAggregations];
+for (int j = 0; j < numAggregations; j++) {
+  value[j] = functions[col].apply(row, col);
+  col ++;
+}
+Record record = new Record(new Key(key), value);
+indexedTable.upsert(record);
+  }
+}
+

[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-09-18 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325795302
 
 

 ##
 File path: 
pinot-common/src/main/java/org/apache/pinot/common/response/broker/ResultTable.java
 ##
 @@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.io.Serializable;
+import java.util.List;
+
+
+/**
+ * Hosts the results in a standard tabular structure
+ */
+@JsonPropertyOrder({"columns", "results"})
 
 Review comment:
   Nit: why not s/results/rows, especially when the member variable is called 
_rows?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-09-18 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325806695
 
 

 ##
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 ##
 @@ -246,22 +264,46 @@ public BrokerResponseNative reduceOnDataTable(@Nonnull 
BrokerRequest brokerReque
 // Aggregation query.
 AggregationFunction[] aggregationFunctions =
 
AggregationFunctionUtils.getAggregationFunctions(brokerRequest.getAggregationsInfo());
+
 if (!brokerRequest.isSetGroupBy()) {
   // Aggregation only query.
   setAggregationResults(brokerResponseNative, aggregationFunctions, 
dataTableMap, cachedDataSchema,
   preserveType);
-} else {
-  // Aggregation group-by query.
-  boolean[] aggregationFunctionSelectStatus =
-  
AggregationFunctionUtils.getAggregationFunctionsSelectStatus(brokerRequest.getAggregationsInfo());
-  setGroupByHavingResults(brokerResponseNative, aggregationFunctions, 
aggregationFunctionSelectStatus,
-  brokerRequest.getGroupBy(), dataTableMap, 
brokerRequest.getHavingFilterQuery(),
-  brokerRequest.getHavingFilterSubQueryMap(), preserveType);
-  if (brokerMetrics != null && 
(!brokerResponseNative.getAggregationResults().isEmpty())) {
-// We emit the group by size when the result isn't empty. All the 
sizes among group-by results should be the same.
-// Thus, we can just emit the one from the 1st result.
-brokerMetrics.addMeteredQueryValue(brokerRequest, 
BrokerMeter.GROUP_BY_SIZE,
-
brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size());
+} else { // Aggregation group-by query.
 
 Review comment:
   May be factorize this as a utility function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-09-18 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325805705
 
 

 ##
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOperator.java
 ##
 @@ -58,6 +64,34 @@ public AggregationGroupByOperator(@Nonnull 
AggregationFunctionContext[] function
 _transformOperator = transformOperator;
 _numTotalRawDocs = numTotalRawDocs;
 _useStarTree = useStarTree;
+
+int numColumns = groupBy.getExpressionsSize() + _functionContexts.length;
 
 Review comment:
   Seems like this is a change in the default path, purely due to signature 
change for constructor of IntermediateResultBlock? Ideally, I'd want to have no 
changes to default path while this project is still on-going. Of course, we can 
take smart risks depending on how the change. I'll let you assess that for this 
case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org



[GitHub] [incubator-pinot] mayankshriv commented on a change in pull request #4602: First pass of GROUP BY with ORDER BY support

2019-09-18 Thread GitBox
mayankshriv commented on a change in pull request #4602: First pass of GROUP BY 
with ORDER BY support
URL: https://github.com/apache/incubator-pinot/pull/4602#discussion_r325804508
 
 

 ##
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 ##
 @@ -204,18 +215,25 @@ public BrokerResponseNative reduceOnDataTable(@Nonnull 
BrokerRequest brokerReque
 }
 
 // Parse the option from request whether to preserve the type
-String preserveTypeString = (brokerRequest.getQueryOptions() == null) ? 
"false" : brokerRequest.getQueryOptions()
-
.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.PRESERVE_TYPE, 
"false");
+Map queryOptions = brokerRequest.getQueryOptions();
+String preserveTypeString =
+(queryOptions == null) ? "false" : 
queryOptions.getOrDefault(QueryOptionKey.PRESERVE_TYPE, "false");
 boolean preserveType = Boolean.valueOf(preserveTypeString);
 
 if (dataTableMap.isEmpty()) {
-  // For empty data table map, construct empty result using the cached 
data schema.
-
-  // This will only happen to selection query.
+  // even though results empty, set columns for these operations
   if (cachedDataSchema != null) {
-List selectionColumns = SelectionOperatorUtils
-
.getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(), 
cachedDataSchema);
-brokerResponseNative.setSelectionResults(new 
SelectionResults(selectionColumns, new ArrayList<>(0)));
+if (brokerRequest.isSetSelections()) {
+  List selectionColumns =
+  
SelectionOperatorUtils.getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(),
+  cachedDataSchema);
+  brokerResponseNative.setSelectionResults(new 
SelectionResults(selectionColumns, new ArrayList<>(0)));
+} else if (brokerRequest.isSetGroupBy() && queryOptions != null && 
SQL.equalsIgnoreCase(
+queryOptions.get(QueryOptionKey.GROUP_BY_MODE)) && 
SQL.equalsIgnoreCase(
+queryOptions.get(QueryOptionKey.RESPONSE_FORMAT))) {
 
 Review comment:
   Is RESPONSE_FORMAT something that user specifies? If so, are we support 
query type SQL and response type PQL, and other such combinations? Not sure how 
that would useful, why not just dictate response based on query type (in which 
case we don't need this extra option)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org