mayankshriv commented on a change in pull request #4578: Refactor 
TransformBlockDataFetcher so that DISTINCT and reuse it
URL: https://github.com/apache/incubator-pinot/pull/4578#discussion_r320903302
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOperator.java
 ##########
 @@ -42,212 +39,168 @@
 import org.apache.pinot.core.operator.transform.TransformBlockDataFetcher;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.core.segment.index.readers.Dictionary;
 
 
-/**
- * This SelectionOnlyOperator will take care of applying a selection query to 
one IndexSegment.
- * nextBlock() will return an IntermediateResultBlock for the given 
IndexSegment.
- */
 public class SelectionOperator extends BaseOperator<IntermediateResultsBlock> {
-
-  private static final String OPERATOR_NAME = "SelectionOnlyOperator";
+  private static final String OPERATOR_NAME = "SelectionOperator";
 
   private final IndexSegment _indexSegment;
   private final TransformOperator _transformOperator;
-  private final DataSchema _dataSchema;
-  private final BlockValSet[] _blockValSets;
+  private final List<SelectionSort> _sortSequence;
+  private final List<TransformExpressionTree> _expressions;
   private final int _limit;
-  private Selection _selection;
-  private final int _offset;
   private final int _maxRows;
-  private final List<TransformExpressionTree> _expressions;
-  private final List<TransformExpressionTree> _selectExpressions;
-  private final List<TransformExpressionTree> _orderByExpressions;
-  private final List<Integer> _orderByIndices;
-  private final TransformResultMetadata[] _expressionResultMetadata;
-  private final Dictionary[] _dictionaries;
-  private Collection<Serializable[]> _rowEvents;
-  private PriorityQueue<Serializable[]> _priorityQueue;
+  private final Collection<Serializable[]> _rows;
+  private final PriorityQueue<Serializable[]> _priorityQueue;
+  private final BlockValSet[] _blockValSets;
+  private final DataSchema _dataSchema;
 
   private ExecutionStatistics _executionStatistics;
 
-  public SelectionOperator(IndexSegment indexSegment, Selection selection, 
TransformOperator transformOperator) {
+  /**
+   * NOTE: The expressions in the transform operator has the sort expressions 
at the front. The sort sequence is the
+   * deduplicated sort expressions.
+   */
+  public SelectionOperator(IndexSegment indexSegment, Selection selection, 
TransformOperator transformOperator,
+      List<SelectionSort> sortSequence) {
     _indexSegment = indexSegment;
-    _offset = selection.getOffset();
-    _limit = selection.getSize();
-    _selection = selection;
-    _maxRows = _offset + _limit;
     _transformOperator = transformOperator;
+    _sortSequence = sortSequence;
     _expressions = _transformOperator.getExpressions();
+    _limit = selection.getSize();
 
-    List<String> selectColumns = selection.getSelectionColumns();
-    if (selectColumns.size() == 1 && selectColumns.get(0).equals("*")) {
-      selectColumns = new LinkedList<>(indexSegment.getPhysicalColumnNames());
-    }
-
-    List<String> orderByColumns = new ArrayList<>();
-    if (selection.getSelectionSortSequence() != null) {
-      for (SelectionSort selectionSort : selection.getSelectionSortSequence()) 
{
-        String expression = selectionSort.getColumn();
-        orderByColumns.add(expression);
-      }
-    }
-    _selectExpressions = new ArrayList<>();
-    _orderByExpressions = new ArrayList<>();
-    _orderByIndices = new ArrayList<>();
-    for (int i = 0; i < _expressions.size(); i++) {
-      TransformExpressionTree expression = _expressions.get(i);
-      if (selectColumns.contains(expression.toString())) {
-        _selectExpressions.add(expression);
-      }
-    }
-
-    for (String orderByColumn : orderByColumns) {
-      for (int i = 0; i < _expressions.size(); i++) {
-        TransformExpressionTree expression = _expressions.get(i);
-        if (orderByColumn.equalsIgnoreCase(expression.toString())) {
-          _orderByExpressions.add(expression);
-          _orderByIndices.add(i);
-        }
-      }
+    boolean selectionOnly = sortSequence.isEmpty();
+    if (selectionOnly) {
+      // Selection only
+      _maxRows = _limit;
+      _rows = new ArrayList<>(_maxRows);
+      _priorityQueue = null;
+    } else {
+      // Selection order-by
+      _maxRows = selection.getOffset() + _limit;
+      _rows = null;
+      _priorityQueue = new PriorityQueue<>(_maxRows, getComparator());
     }
 
-    _blockValSets = new BlockValSet[_expressions.size()];
-    _expressionResultMetadata = new 
TransformResultMetadata[_expressions.size()];
-    _dictionaries = new Dictionary[_expressions.size()];
-    DataSchema.ColumnDataType[] columnDataTypes = new 
DataSchema.ColumnDataType[_expressions.size()];
-    String[] columnNames = new String[_expressions.size()];
-    for (int i = 0; i < _expressions.size(); i++) {
+    int numExpressions = _expressions.size();
+    _blockValSets = new BlockValSet[numExpressions];
+    String[] columnNames = new String[numExpressions];
+    ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
       TransformExpressionTree expression = _expressions.get(i);
-      _expressionResultMetadata[i] = 
_transformOperator.getResultMetadata(expression);
-      columnDataTypes[i] = DataSchema.ColumnDataType
-          .fromDataType(_expressionResultMetadata[i].getDataType(), 
_expressionResultMetadata[i].isSingleValue());
       columnNames[i] = expression.toString();
-      if (_expressionResultMetadata[i].hasDictionary()) {
-        _dictionaries[i] = _transformOperator.getDictionary(expression);
-      }
+      TransformResultMetadata resultMetadata = 
_transformOperator.getResultMetadata(expression);
+      columnDataTypes[i] = 
ColumnDataType.fromDataType(resultMetadata.getDataType(), 
resultMetadata.isSingleValue());
     }
     _dataSchema = new DataSchema(columnNames, columnDataTypes);
-    if (_orderByExpressions.isEmpty()) {
-      _rowEvents = new ArrayList<>();
-    } else {
-      Comparator<Serializable[]> comparator = getStrictComparator();
-      _priorityQueue = new PriorityQueue<>(_maxRows, comparator);
-    }
   }
 
-  private Comparator<Serializable[]> getStrictComparator() {
-    return new Comparator<Serializable[]>() {
-      @Override
-      public int compare(Serializable[] o1, Serializable[] o2) {
-        List<SelectionSort> sortSequence = 
_selection.getSelectionSortSequence();
-        int numSortColumns = sortSequence.size();
-        for (int i = 0; i < numSortColumns; i++) {
-          int ret = 0;
-          SelectionSort selectionSort = sortSequence.get(i);
-          int index = _orderByIndices.get(i);
-          // Only compare single-value columns.
-          if (!_expressionResultMetadata[index].isSingleValue()) {
-            continue;
-          }
-
-          Serializable v1 = o1[index];
-          Serializable v2 = o2[index];
-
-          DataType dataType = _expressionResultMetadata[index].getDataType();
-          switch (dataType) {
-            case INT:
-              if (!selectionSort.isIsAsc()) {
-                ret = ((Integer) v1).compareTo((Integer) v2);
-              } else {
-                ret = ((Integer) v2).compareTo((Integer) v1);
-              }
-              break;
-            case LONG:
-              if (!selectionSort.isIsAsc()) {
-                ret = ((Long) v1).compareTo((Long) v2);
-              } else {
-                ret = ((Long) v2).compareTo((Long) v1);
-              }
-              break;
-            case FLOAT:
-              if (!selectionSort.isIsAsc()) {
-                ret = ((Float) v1).compareTo((Float) v2);
-              } else {
-                ret = ((Float) v2).compareTo((Float) v1);
-              }
-              break;
-            case DOUBLE:
-              if (!selectionSort.isIsAsc()) {
-                ret = ((Double) v1).compareTo((Double) v2);
-              } else {
-                ret = ((Double) v2).compareTo((Double) v1);
-              }
-              break;
-            case BOOLEAN:
-            case STRING:
-              if (!selectionSort.isIsAsc()) {
-                ret = ((String) v1).compareTo((String) v2);
-              } else {
-                ret = ((String) v2).compareTo((String) v1);
-              }
-              break;
-            case BYTES:
-              if (!selectionSort.isIsAsc()) {
-                ret = ByteArray.compare((byte[]) v1, (byte[]) v2);
-              } else {
-                ret = ByteArray.compare((byte[]) v2, (byte[]) v1);
-              }
-            default:
-              break;
-          }
+  // TODO: Optimize the comparator by comparing dictionary ids
+  private Comparator<Serializable[]> getComparator() {
+    return (o1, o2) -> {
+      int numSortExpressions = _sortSequence.size();
+      for (int i = 0; i < numSortExpressions; i++) {
+        // Only compare single-value columns
+        if (!_blockValSets[i].isSingleValue()) {
+          continue;
+        }
 
-          if (ret != 0) {
-            return ret;
-          }
+        Serializable v1 = o1[i];
+        Serializable v2 = o2[i];
+
+        int ret = 0;
+        SelectionSort selectionSort = _sortSequence.get(i);
+        switch (_blockValSets[i].getValueType()) {
+          case INT:
+            if (!selectionSort.isIsAsc()) {
 
 Review comment:
   Nit: can it not be replaced by one-liner:
   `ret = (!selectionSort.isAsc()) ? ... : ...`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to