JackieTien97 commented on code in PR #8613:
URL: https://github.com/apache/iotdb/pull/8613#discussion_r1058888630


##########
server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java:
##########
@@ -123,23 +128,107 @@ public class MergeSortComparator {
             : deviceComparing;
       };
 
-  public static Comparator<MergeSortKey> getComparator(List<SortItem> 
sortItemList) {
-    if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
-      if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
ASC_TIME_ASC_DEVICE;
-        else return ASC_DEVICE_ASC_TIME;
-      } else {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
ASC_TIME_DESC_DEVICE;
-        else return ASC_DEVICE_DESC_TIME;
-      }
-    } else {
-      if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
DESC_TIME_ASC_DEVICE;
-        else return DESC_DEVICE_ASC_TIME;
+  public static Comparator<MergeSortKey> getComparator(
+      List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType> 
dataTypeList) {
+    // specified for order by time, device or order by device, time
+    if (sortItemList.size() == 2
+        && ((sortItemList.get(0).getSortKey() == SortKey.TIME
+                && sortItemList.get(1).getSortKey() == SortKey.DEVICE)
+            || (sortItemList.get(0).getSortKey() == SortKey.DEVICE
+                && sortItemList.get(1).getSortKey() == SortKey.TIME))) {
+      if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
+        if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
ASC_TIME_ASC_DEVICE;
+          else return ASC_DEVICE_ASC_TIME;
+        } else {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
ASC_TIME_DESC_DEVICE;
+          else return ASC_DEVICE_DESC_TIME;
+        }
       } else {
-        if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
DESC_TIME_DESC_DEVICE;
-        else return DESC_DEVICE_DESC_TIME;
+        if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
DESC_TIME_ASC_DEVICE;
+          else return DESC_DEVICE_ASC_TIME;
+        } else {
+          if (sortItemList.get(0).getSortKey() == SortKey.TIME) return 
DESC_TIME_DESC_DEVICE;
+          else return DESC_DEVICE_DESC_TIME;
+        }
       }
+    } else { // generally order by
+      // use code-gen compile this comparator
+
+      // currently only show queries use merge sort and will only contain 
TIME, QUERYID, DATANODEID,
+      // ELAPSEDTIME, STATEMENT
+      return genComparatorChain(sortItemList, indexList, dataTypeList);
+    }
+  }
+
+  /** @param indexList -1 for time column */
+  private static ComparatorChain<MergeSortKey> genComparatorChain(
+      List<SortItem> sortItemList, List<Integer> indexList, List<TSDataType> 
dataTypeList) {
+    List<Comparator<MergeSortKey>> list = new ArrayList<>(indexList.size());
+    for (int i = 0; i < indexList.size(); i++) {
+      int index = indexList.get(i);
+      TSDataType dataType = dataTypeList.get(i);
+      boolean asc = sortItemList.get(i).getOrdering() == Ordering.ASC;
+      list.add(genSingleComparator(asc, index, dataType));
     }
+    return new ComparatorChain<>(list);
+  }
+
+  private static Comparator<MergeSortKey> genSingleComparator(
+      boolean asc, int index, TSDataType dataType) {
+    Comparator<MergeSortKey> comparator;
+    switch (dataType) {
+      case INT32:
+        comparator =
+            Comparator.comparingInt(
+                (MergeSortKey sortKey) ->
+                    sortKey.tsBlock.getColumn(index).getInt(sortKey.rowIndex));
+        break;
+      case INT64:
+        if (index == -1) {
+          comparator =
+              Comparator.comparingLong(
+                  (MergeSortKey sortKey) -> 
sortKey.tsBlock.getTimeByIndex(sortKey.rowIndex));
+        } else {
+          comparator =
+              Comparator.comparingLong(
+                  (MergeSortKey sortKey) ->
+                      
sortKey.tsBlock.getColumn(index).getLong(sortKey.rowIndex));
+        }
+        break;
+      case FLOAT:
+        comparator =
+            Comparator.comparingDouble(
+                (MergeSortKey sortKey) ->
+                    
sortKey.tsBlock.getColumn(index).getFloat(sortKey.rowIndex));
+        break;
+      case DOUBLE:
+        comparator =
+            Comparator.comparingDouble(
+                (MergeSortKey sortKey) ->
+                    
sortKey.tsBlock.getColumn(index).getDouble(sortKey.rowIndex));
+        break;
+      case TEXT:
+        comparator =
+            Comparator.comparing(
+                (MergeSortKey sortKey) ->
+                    
sortKey.tsBlock.getColumn(index).getBinary(sortKey.rowIndex));
+        break;
+      default:
+        throw new IllegalArgumentException("Data type: " + dataType + " cannot 
be ordered");
+    }
+    if (!asc) {
+      comparator = comparator.reversed();
+    }
+    return comparator;
+  }
+
+  public static void main() {
+    Comparator<MergeSortKey> comparator =
+        Comparator.comparing(
+            (MergeSortKey sortKey) -> 
sortKey.tsBlock.getColumn(1).getBinary(sortKey.rowIndex));
+    List<MergeSortKey> data = Arrays.asList(new MergeSortKey(null, 5));
+    data.sort(comparator);

Review Comment:
   ```suggestion
   ```



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java:
##########
@@ -23,26 +23,21 @@
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class MergeSortNode extends MultiChildProcessNode {
 
   private final OrderByParameter mergeOrderParameter;
 
-  private final List<String> outputColumns;

Review Comment:
   add it back, it's an opt for align by device



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java:
##########
@@ -252,22 +253,19 @@ public PlanNode visitMergeSort(MergeSortNode node, 
NodeGroupContext context) {
       mergeSortNodeList.add(mergeSortNode);
     }
 
-    return groupPlanNodeByMergeSortNode(
-        mergeSortNodeList, node.getOutputColumnNames(), 
node.getMergeOrderParameter(), context);
+    return groupPlanNodeByMergeSortNode(mergeSortNodeList, 
node.getMergeOrderParameter(), context);
   }
 
   private PlanNode groupPlanNodeByMergeSortNode(
       List<PlanNode> mergeSortNodeList,
-      List<String> outputColumns,
       OrderByParameter orderByParameter,
       NodeGroupContext context) {
     if (mergeSortNodeList.size() == 1) {
       return mergeSortNodeList.get(0);
     }
 
     MergeSortNode mergeSortNode =
-        new MergeSortNode(
-            context.queryContext.getQueryId().genPlanNodeId(), 
orderByParameter, outputColumns);

Review Comment:
   add it back



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java:
##########
@@ -192,9 +190,7 @@ public List<PlanNode> visitDeviceView(DeviceViewNode node, 
DistributionPlanConte
 
     MergeSortNode mergeSortNode =
         new MergeSortNode(
-            context.queryContext.getQueryId().genPlanNodeId(),
-            node.getMergeOrderParameter(),
-            node.getOutputColumnNames());

Review Comment:
   add it back



##########
server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java:
##########
@@ -239,9 +242,7 @@ public PlanNode visitMergeSort(MergeSortNode node, 
NodeGroupContext context) {
       }
       MergeSortNode mergeSortNode =
           new MergeSortNode(
-              context.queryContext.getQueryId().genPlanNodeId(),
-              node.getMergeOrderParameter(),
-              node.getOutputColumnNames());

Review Comment:
   add it back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to