JackieTien97 commented on code in PR #16890:
URL: https://github.com/apache/iotdb/pull/16890#discussion_r2605367832


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java:
##########
@@ -67,6 +67,15 @@ public String getId() {
     return id;
   }
 
+  public int getDataNodeId() {
+    return getDataNodeId(id);
+  }
+
+  public static int getDataNodeId(String queryId) {
+    String[] splits = queryId.split("_");
+    return Integer.parseInt(splits[splits.length - 1]);
+  }
+

Review Comment:
   ```suggestion
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -203,12 +216,28 @@ public class Coordinator {
 
   private final ConcurrentHashMap<Long, IQueryExecution> queryExecutionMap;
 
+  private final BlockingDeque<QueryInfo> currentQueriesInfo = new 
LinkedBlockingDeque<>();
+  private final AtomicInteger[] currentQueriesCostHistogram = new 
AtomicInteger[61];
+  private final ScheduledExecutorService retryFailTasksExecutor =
+      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+          ThreadName.EXPIRED_QUERIES_INFO_CLEAR.getName());
+
   private final StatementRewrite statementRewrite;
   private final List<PlanOptimizer> logicalPlanOptimizers;
   private final List<PlanOptimizer> distributionPlanOptimizers;
   private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier 
dataNodeLocationSupplier;
   private final TypeManager typeManager;
 
+  {
+    for (int i = 0; i < 61; i++) {
+      currentQueriesCostHistogram[i] = new AtomicInteger();
+    }
+
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        retryFailTasksExecutor, this::clearExpiredQueriesInfoTask, 5, 5, 
TimeUnit.MILLISECONDS);

Review Comment:
   1 second check frequency is enough.
   ```suggestion
           retryFailTasksExecutor, this::clearExpiredQueriesInfoTask, 1_000, 
1_000, TimeUnit.MILLISECONDS);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1181,4 +1183,140 @@ public boolean hasNext() {
       return sessionConnectionIterator.hasNext();
     }
   }
+
+  private static class CurrentQueriesSupplier extends TsBlockSupplier {
+    protected int nextConsumedIndex;
+    private List<Coordinator.StatedQueriesInfo> queriesInfo;
+
+    private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final 
UserEntity userEntity) {
+      super(dataTypes);
+      queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
+      try {
+        accessControl.checkUserGlobalSysPrivilege(userEntity);
+      } catch (final AccessDeniedException e) {
+        queriesInfo =
+            queriesInfo.stream()
+                .filter(iQueryInfo -> 
userEntity.getUsername().equals(iQueryInfo.getUser()))
+                .collect(Collectors.toList());
+      }
+    }
+
+    @Override
+    protected void constructLine() {
+      final Coordinator.StatedQueriesInfo queryInfo = 
queriesInfo.get(nextConsumedIndex);
+      
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryInfo.getQueryId()));
+      
columnBuilders[1].writeBinary(BytesUtils.valueOf(queryInfo.getQueryState()));
+      columnBuilders[2].writeLong(
+          TimestampPrecisionUtils.convertToCurrPrecision(
+              queryInfo.getStartTime(), TimeUnit.MILLISECONDS));
+      if (queryInfo.getEndTime() == Coordinator.QueryInfo.DEFAULT_END_TIME) {
+        columnBuilders[3].appendNull();
+      } else {
+        columnBuilders[3].writeLong(
+            TimestampPrecisionUtils.convertToCurrPrecision(
+                queryInfo.getEndTime(), TimeUnit.MILLISECONDS));
+      }
+      
columnBuilders[4].writeInt(QueryId.getDataNodeId(queryInfo.getQueryId()));
+      columnBuilders[5].writeFloat(queryInfo.getCostTime());
+      
columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement()));
+      columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser()));
+      
columnBuilders[8].writeBinary(BytesUtils.valueOf(queryInfo.getClientHost()));
+      resultBuilder.declarePosition();
+      nextConsumedIndex++;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextConsumedIndex < queriesInfo.size();
+    }
+  }
+
+  private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
+    protected int nextConsumedIndex;
+    private static final Binary[] BUCKETS =
+        new Binary[] {
+          BytesUtils.valueOf("[0,1)"),
+          BytesUtils.valueOf("[1,2)"),
+          BytesUtils.valueOf("[2,3)"),
+          BytesUtils.valueOf("[3,4)"),
+          BytesUtils.valueOf("[4,5)"),
+          BytesUtils.valueOf("[5,6)"),
+          BytesUtils.valueOf("[6,7)"),
+          BytesUtils.valueOf("[7,8)"),
+          BytesUtils.valueOf("[8,9)"),
+          BytesUtils.valueOf("[9,10)"),
+          BytesUtils.valueOf("[10,11)"),
+          BytesUtils.valueOf("[11,12)"),
+          BytesUtils.valueOf("[12,13)"),
+          BytesUtils.valueOf("[13,14)"),
+          BytesUtils.valueOf("[14,15)"),
+          BytesUtils.valueOf("[15,16)"),
+          BytesUtils.valueOf("[16,17)"),
+          BytesUtils.valueOf("[17,18)"),
+          BytesUtils.valueOf("[18,19)"),
+          BytesUtils.valueOf("[19,20)"),
+          BytesUtils.valueOf("[20,21)"),
+          BytesUtils.valueOf("[21,22)"),
+          BytesUtils.valueOf("[22,23)"),
+          BytesUtils.valueOf("[23,24)"),
+          BytesUtils.valueOf("[24,25)"),
+          BytesUtils.valueOf("[25,26)"),
+          BytesUtils.valueOf("[26,27)"),
+          BytesUtils.valueOf("[27,28)"),
+          BytesUtils.valueOf("[28,29)"),
+          BytesUtils.valueOf("[29,30)"),
+          BytesUtils.valueOf("[30,31)"),
+          BytesUtils.valueOf("[31,32)"),
+          BytesUtils.valueOf("[32,33)"),
+          BytesUtils.valueOf("[33,34)"),
+          BytesUtils.valueOf("[34,35)"),
+          BytesUtils.valueOf("[35,36)"),
+          BytesUtils.valueOf("[36,37)"),
+          BytesUtils.valueOf("[37,38)"),
+          BytesUtils.valueOf("[38,39)"),
+          BytesUtils.valueOf("[39,40)"),
+          BytesUtils.valueOf("[40,41)"),
+          BytesUtils.valueOf("[41,42)"),
+          BytesUtils.valueOf("[42,43)"),
+          BytesUtils.valueOf("[43,44)"),
+          BytesUtils.valueOf("[44,45)"),
+          BytesUtils.valueOf("[45,46)"),
+          BytesUtils.valueOf("[46,47)"),
+          BytesUtils.valueOf("[47,48)"),
+          BytesUtils.valueOf("[48,49)"),
+          BytesUtils.valueOf("[49,50)"),
+          BytesUtils.valueOf("[50,51)"),
+          BytesUtils.valueOf("[51,52)"),
+          BytesUtils.valueOf("[52,53)"),
+          BytesUtils.valueOf("[53,54)"),
+          BytesUtils.valueOf("[54,55)"),
+          BytesUtils.valueOf("[55,56)"),
+          BytesUtils.valueOf("[56,57)"),
+          BytesUtils.valueOf("[57,58)"),
+          BytesUtils.valueOf("[58,59)"),
+          BytesUtils.valueOf("[59,60)"),
+          BytesUtils.valueOf("60+")
+        };
+    private int[] currentQueriesCostHistogram;

Review Comment:
   ```suggestion
       private final int[] currentQueriesCostHistogram;
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java:
##########
@@ -67,6 +67,15 @@ public String getId() {
     return id;
   }
 
+  public int getDataNodeId() {
+    return getDataNodeId(id);

Review Comment:
   init a static field datanode id = 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), then directly return 
that



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1181,4 +1183,140 @@ public boolean hasNext() {
       return sessionConnectionIterator.hasNext();
     }
   }
+
+  private static class CurrentQueriesSupplier extends TsBlockSupplier {
+    protected int nextConsumedIndex;
+    private List<Coordinator.StatedQueriesInfo> queriesInfo;
+
+    private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final 
UserEntity userEntity) {
+      super(dataTypes);
+      queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
+      try {
+        accessControl.checkUserGlobalSysPrivilege(userEntity);
+      } catch (final AccessDeniedException e) {
+        queriesInfo =
+            queriesInfo.stream()
+                .filter(iQueryInfo -> 
userEntity.getUsername().equals(iQueryInfo.getUser()))
+                .collect(Collectors.toList());
+      }
+    }
+
+    @Override
+    protected void constructLine() {
+      final Coordinator.StatedQueriesInfo queryInfo = 
queriesInfo.get(nextConsumedIndex);
+      
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryInfo.getQueryId()));
+      
columnBuilders[1].writeBinary(BytesUtils.valueOf(queryInfo.getQueryState()));
+      columnBuilders[2].writeLong(
+          TimestampPrecisionUtils.convertToCurrPrecision(
+              queryInfo.getStartTime(), TimeUnit.MILLISECONDS));
+      if (queryInfo.getEndTime() == Coordinator.QueryInfo.DEFAULT_END_TIME) {
+        columnBuilders[3].appendNull();
+      } else {
+        columnBuilders[3].writeLong(
+            TimestampPrecisionUtils.convertToCurrPrecision(
+                queryInfo.getEndTime(), TimeUnit.MILLISECONDS));
+      }
+      
columnBuilders[4].writeInt(QueryId.getDataNodeId(queryInfo.getQueryId()));
+      columnBuilders[5].writeFloat(queryInfo.getCostTime());
+      
columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement()));
+      columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser()));
+      
columnBuilders[8].writeBinary(BytesUtils.valueOf(queryInfo.getClientHost()));
+      resultBuilder.declarePosition();
+      nextConsumedIndex++;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextConsumedIndex < queriesInfo.size();
+    }
+  }
+
+  private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
+    protected int nextConsumedIndex;

Review Comment:
   ```suggestion
       private int nextConsumedIndex;
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public 
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
   public ExecutorService getDispatchExecutor() {
     return dispatchExecutor;
   }
+
+  /** record query info in memory data structure */
+  public void recordCurrentQueries(
+      String queryId,
+      long startTime,
+      long endTime,
+      long costTimeInNs,
+      String statement,
+      String user,
+      String clientHost) {
+    if (CONFIG.getQueryCostStatWindow() <= 0) {
+      return;
+    }
+
+    if (queryId == null) {
+      // fast Last query API executeFastLastDataQueryForOnePrefixPath will 
enter this
+      queryId = queryIdGenerator.createNextQueryId().getId();
+    }
+
+    // ns -> s
+    float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+    QueryInfo queryInfo =
+        new QueryInfo(queryId, startTime, endTime, costTimeInSeconds, 
statement, user, clientHost);
+
+    while 
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+      // try to release memory from the head of queue
+      QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+      if (queryInfoToRelease == null) {
+        // no element in the queue and the memory is still not enough, skip 
this record
+        return;
+      } else {
+        // release memory and unrecord in histogram
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+        unrecordInHistogram(queryInfoToRelease.costTime);
+      }
+    }
+
+    currentQueriesInfo.addLast(queryInfo);
+    recordInHistogram(costTimeInSeconds);
+  }
+
+  private void recordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndIncrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndIncrement();
+    }
+  }
+
+  private void unrecordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndDecrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndDecrement();
+    }
+  }
+
+  private void clearExpiredQueriesInfoTask() {
+    int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+    if (queryCostStatWindow <= 0) {
+      return;
+    }
+
+    // the QueryInfo smaller than expired time will be cleared
+    long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 * 
1_000;
+    // peek head, the head QueryInfo is in the time window, return directly
+    QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+    if (queryInfo.endTime >= expiredTime) {
+      return;
+    }
+
+    queryInfo = currentQueriesInfo.poll();
+    while (queryInfo != null) {
+      if (queryInfo.endTime < expiredTime) {
+        // out of time window, clear queryInfo
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo));
+        unrecordInHistogram(queryInfo.costTime);
+        queryInfo = currentQueriesInfo.poll();
+      } else {
+        // the head of the queue is not expired, add back
+        currentQueriesInfo.addFirst(queryInfo);
+        //  there is no more candidate to clear
+        return;
+      }
+    }
+  }
+
+  public List<StatedQueriesInfo> getCurrentQueriesInfo() {
+    List<IQueryExecution> runningQueries = getAllQueryExecutions();
+    Set<String> runningQueryIdSet =
+        
runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet());
+    List<StatedQueriesInfo> result = new ArrayList<>();
+
+    // add History queries (satisfy the time window) info
+    Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
+    Set<String> repetitionQueryIdSet = new HashSet<>();
+    long currentTime = System.currentTimeMillis();
+    long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 * 
1_000;

Review Comment:
   ```suggestion
       long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 
* 1_000L;
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -201,14 +206,11 @@ protected void constructLine() {
       final IQueryExecution queryExecution = 
queryExecutions.get(nextConsumedIndex);
 
       if 
(queryExecution.getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) {
-        final String[] splits = queryExecution.getQueryId().split("_");
-        final int dataNodeId = Integer.parseInt(splits[splits.length - 1]);
-
         
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryExecution.getQueryId()));
         columnBuilders[1].writeLong(
             TimestampPrecisionUtils.convertToCurrPrecision(
                 queryExecution.getStartExecutionTime(), 
TimeUnit.MILLISECONDS));
-        columnBuilders[2].writeInt(dataNodeId);
+        
columnBuilders[2].writeInt(QueryId.getDataNodeId(queryExecution.getQueryId()));

Review Comment:
   use the static field



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -625,8 +654,20 @@ public void cleanupQueryExecution(
       try (SetThreadName threadName = new 
SetThreadName(queryExecution.getQueryId())) {
         LOGGER.debug("[CleanUpQuery]]");
         queryExecution.stopAndCleanup(t);
+        boolean isUserQuery = queryExecution.isQuery() && 
queryExecution.isUserQuery();
+        if (isUserQuery) {
+          recordCurrentQueries(
+              queryExecution.getQueryId(),
+              queryExecution.getStartExecutionTime(),
+              queryExecution.getStartExecutionTime()
+                  + queryExecution.getTotalExecutionTime() / 1_000_000L,

Review Comment:
   ```suggestion
                 System.currentTimeMillis(),
   ```
   endTime should be current time, endTime-startTime don't need to be equal to 
getTotalExecutionTime, `getTotalExecutionTime` is all rpc latentcy sum.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public 
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
   public ExecutorService getDispatchExecutor() {
     return dispatchExecutor;
   }
+
+  /** record query info in memory data structure */
+  public void recordCurrentQueries(
+      String queryId,
+      long startTime,
+      long endTime,
+      long costTimeInNs,
+      String statement,
+      String user,
+      String clientHost) {
+    if (CONFIG.getQueryCostStatWindow() <= 0) {
+      return;
+    }
+
+    if (queryId == null) {
+      // fast Last query API executeFastLastDataQueryForOnePrefixPath will 
enter this
+      queryId = queryIdGenerator.createNextQueryId().getId();
+    }
+
+    // ns -> s
+    float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+    QueryInfo queryInfo =
+        new QueryInfo(queryId, startTime, endTime, costTimeInSeconds, 
statement, user, clientHost);
+
+    while 
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+      // try to release memory from the head of queue
+      QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+      if (queryInfoToRelease == null) {
+        // no element in the queue and the memory is still not enough, skip 
this record
+        return;
+      } else {
+        // release memory and unrecord in histogram
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+        unrecordInHistogram(queryInfoToRelease.costTime);
+      }
+    }
+
+    currentQueriesInfo.addLast(queryInfo);
+    recordInHistogram(costTimeInSeconds);
+  }
+
+  private void recordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndIncrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndIncrement();
+    }
+  }
+
+  private void unrecordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndDecrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndDecrement();
+    }
+  }
+
+  private void clearExpiredQueriesInfoTask() {
+    int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+    if (queryCostStatWindow <= 0) {
+      return;
+    }
+
+    // the QueryInfo smaller than expired time will be cleared
+    long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 * 
1_000;
+    // peek head, the head QueryInfo is in the time window, return directly
+    QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+    if (queryInfo.endTime >= expiredTime) {
+      return;
+    }
+
+    queryInfo = currentQueriesInfo.poll();
+    while (queryInfo != null) {
+      if (queryInfo.endTime < expiredTime) {
+        // out of time window, clear queryInfo
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo));
+        unrecordInHistogram(queryInfo.costTime);
+        queryInfo = currentQueriesInfo.poll();
+      } else {
+        // the head of the queue is not expired, add back
+        currentQueriesInfo.addFirst(queryInfo);
+        //  there is no more candidate to clear
+        return;
+      }
+    }
+  }
+
+  public List<StatedQueriesInfo> getCurrentQueriesInfo() {
+    List<IQueryExecution> runningQueries = getAllQueryExecutions();
+    Set<String> runningQueryIdSet =
+        
runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet());
+    List<StatedQueriesInfo> result = new ArrayList<>();
+
+    // add History queries (satisfy the time window) info
+    Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
+    Set<String> repetitionQueryIdSet = new HashSet<>();
+    long currentTime = System.currentTimeMillis();
+    long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 * 
1_000;
+    while (historyQueriesIterator.hasNext()) {
+      QueryInfo queryInfo = historyQueriesIterator.next();
+      if (queryInfo.endTime < needRecordTime) {
+        // out of time window, ignore it
+      } else {
+        if (runningQueryIdSet.contains(queryInfo.queryId)) {
+          repetitionQueryIdSet.add(queryInfo.queryId);
+        }
+        result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo));
+      }
+    }
+
+    // add Running queries info after remove the repetitions which has 
recorded in History queries
+    result.addAll(
+        runningQueries.stream()
+            .filter(queryExecution -> 
!repetitionQueryIdSet.contains(queryExecution.getQueryId()))
+            .map(
+                queryExecution ->
+                    new StatedQueriesInfo(
+                        QueryState.RUNNING,
+                        queryExecution.getQueryId(),
+                        queryExecution.getStartExecutionTime(),
+                        DEFAULT_END_TIME,
+                        (currentTime - queryExecution.getStartExecutionTime()) 
/ 1000,
+                        queryExecution.getExecuteSQL().orElse("UNKNOWN"),
+                        queryExecution.getUser(),
+                        queryExecution.getClientHostname()))
+            .collect(Collectors.toList()));
+    return result;
+  }
+
+  public int[] getCurrentQueriesCostHistogram() {
+    return 
Arrays.stream(currentQueriesCostHistogram).mapToInt(AtomicInteger::get).toArray();
+  }
+
+  public static class QueryInfo {

Review Comment:
   implements Accountable



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public 
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
   public ExecutorService getDispatchExecutor() {
     return dispatchExecutor;
   }
+
+  /** record query info in memory data structure */
+  public void recordCurrentQueries(
+      String queryId,
+      long startTime,
+      long endTime,
+      long costTimeInNs,
+      String statement,
+      String user,
+      String clientHost) {
+    if (CONFIG.getQueryCostStatWindow() <= 0) {
+      return;
+    }
+
+    if (queryId == null) {
+      // fast Last query API executeFastLastDataQueryForOnePrefixPath will 
enter this
+      queryId = queryIdGenerator.createNextQueryId().getId();
+    }
+
+    // ns -> s
+    float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+    QueryInfo queryInfo =
+        new QueryInfo(queryId, startTime, endTime, costTimeInSeconds, 
statement, user, clientHost);
+
+    while 
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+      // try to release memory from the head of queue
+      QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+      if (queryInfoToRelease == null) {
+        // no element in the queue and the memory is still not enough, skip 
this record
+        return;
+      } else {
+        // release memory and unrecord in histogram
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+        unrecordInHistogram(queryInfoToRelease.costTime);
+      }
+    }
+
+    currentQueriesInfo.addLast(queryInfo);
+    recordInHistogram(costTimeInSeconds);
+  }
+
+  private void recordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndIncrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndIncrement();
+    }
+  }
+
+  private void unrecordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndDecrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndDecrement();
+    }
+  }
+
+  private void clearExpiredQueriesInfoTask() {
+    int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+    if (queryCostStatWindow <= 0) {
+      return;
+    }
+
+    // the QueryInfo smaller than expired time will be cleared
+    long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 * 
1_000;
+    // peek head, the head QueryInfo is in the time window, return directly
+    QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+    if (queryInfo.endTime >= expiredTime) {
+      return;
+    }
+
+    queryInfo = currentQueriesInfo.poll();
+    while (queryInfo != null) {
+      if (queryInfo.endTime < expiredTime) {
+        // out of time window, clear queryInfo
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo));
+        unrecordInHistogram(queryInfo.costTime);
+        queryInfo = currentQueriesInfo.poll();
+      } else {
+        // the head of the queue is not expired, add back
+        currentQueriesInfo.addFirst(queryInfo);
+        //  there is no more candidate to clear
+        return;
+      }
+    }
+  }
+
+  public List<StatedQueriesInfo> getCurrentQueriesInfo() {
+    List<IQueryExecution> runningQueries = getAllQueryExecutions();
+    Set<String> runningQueryIdSet =
+        
runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet());
+    List<StatedQueriesInfo> result = new ArrayList<>();
+
+    // add History queries (satisfy the time window) info
+    Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
+    Set<String> repetitionQueryIdSet = new HashSet<>();
+    long currentTime = System.currentTimeMillis();
+    long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 * 
1_000;
+    while (historyQueriesIterator.hasNext()) {
+      QueryInfo queryInfo = historyQueriesIterator.next();
+      if (queryInfo.endTime < needRecordTime) {
+        // out of time window, ignore it
+      } else {
+        if (runningQueryIdSet.contains(queryInfo.queryId)) {
+          repetitionQueryIdSet.add(queryInfo.queryId);
+        }
+        result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo));
+      }
+    }
+
+    // add Running queries info after remove the repetitions which has 
recorded in History queries
+    result.addAll(
+        runningQueries.stream()
+            .filter(queryExecution -> 
!repetitionQueryIdSet.contains(queryExecution.getQueryId()))
+            .map(
+                queryExecution ->
+                    new StatedQueriesInfo(
+                        QueryState.RUNNING,
+                        queryExecution.getQueryId(),
+                        queryExecution.getStartExecutionTime(),
+                        DEFAULT_END_TIME,
+                        (currentTime - queryExecution.getStartExecutionTime()) 
/ 1000,
+                        queryExecution.getExecuteSQL().orElse("UNKNOWN"),
+                        queryExecution.getUser(),
+                        queryExecution.getClientHostname()))
+            .collect(Collectors.toList()));
+    return result;
+  }
+
+  public int[] getCurrentQueriesCostHistogram() {
+    return 
Arrays.stream(currentQueriesCostHistogram).mapToInt(AtomicInteger::get).toArray();
+  }
+
+  public static class QueryInfo {

Review Comment:
     private static final long INSTANCE_SIZE = 
shallowSizeOfInstance(QueryInfo.class);
   
   then calculate statement, user, clientHost each time, refer to 
`TimeSeriesMetadataCacheKey`, string size can be calculated by 
sizeOfCharArray(XXX.length());



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -625,8 +654,20 @@ public void cleanupQueryExecution(
       try (SetThreadName threadName = new 
SetThreadName(queryExecution.getQueryId())) {
         LOGGER.debug("[CleanUpQuery]]");
         queryExecution.stopAndCleanup(t);
+        boolean isUserQuery = queryExecution.isQuery() && 
queryExecution.isUserQuery();
+        if (isUserQuery) {
+          recordCurrentQueries(
+              queryExecution.getQueryId(),
+              queryExecution.getStartExecutionTime(),
+              queryExecution.getStartExecutionTime()
+                  + queryExecution.getTotalExecutionTime() / 1_000_000L,
+              queryExecution.getTotalExecutionTime(),
+              queryExecution.getExecuteSQL().orElse("UNKNOWN"),

Review Comment:
   better to use `ContentOfQuerySupplier` which is the same as slow query 
recorder.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public 
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
   public ExecutorService getDispatchExecutor() {
     return dispatchExecutor;
   }
+
+  /** record query info in memory data structure */
+  public void recordCurrentQueries(
+      String queryId,
+      long startTime,
+      long endTime,
+      long costTimeInNs,
+      String statement,
+      String user,
+      String clientHost) {
+    if (CONFIG.getQueryCostStatWindow() <= 0) {
+      return;
+    }
+
+    if (queryId == null) {
+      // fast Last query API executeFastLastDataQueryForOnePrefixPath will 
enter this
+      queryId = queryIdGenerator.createNextQueryId().getId();
+    }
+
+    // ns -> s
+    float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+    QueryInfo queryInfo =
+        new QueryInfo(queryId, startTime, endTime, costTimeInSeconds, 
statement, user, clientHost);
+
+    while 
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+      // try to release memory from the head of queue
+      QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+      if (queryInfoToRelease == null) {
+        // no element in the queue and the memory is still not enough, skip 
this record
+        return;
+      } else {
+        // release memory and unrecord in histogram
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+        unrecordInHistogram(queryInfoToRelease.costTime);
+      }
+    }
+
+    currentQueriesInfo.addLast(queryInfo);
+    recordInHistogram(costTimeInSeconds);
+  }
+
+  private void recordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndIncrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndIncrement();
+    }
+  }
+
+  private void unrecordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndDecrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndDecrement();
+    }
+  }
+
+  private void clearExpiredQueriesInfoTask() {
+    int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+    if (queryCostStatWindow <= 0) {
+      return;
+    }
+
+    // the QueryInfo smaller than expired time will be cleared
+    long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 * 
1_000;

Review Comment:
   ```suggestion
       long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 
* 1_000L;
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public 
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
   public ExecutorService getDispatchExecutor() {
     return dispatchExecutor;
   }
+
+  /** record query info in memory data structure */
+  public void recordCurrentQueries(
+      String queryId,
+      long startTime,
+      long endTime,
+      long costTimeInNs,
+      String statement,

Review Comment:
   ```suggestion
         Supplier<String> contentOfQuerySupplier,
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1181,4 +1183,140 @@ public boolean hasNext() {
       return sessionConnectionIterator.hasNext();
     }
   }
+
+  private static class CurrentQueriesSupplier extends TsBlockSupplier {
+    protected int nextConsumedIndex;

Review Comment:
   ```suggestion
       private int nextConsumedIndex;
   ```



##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -1080,6 +1080,11 @@ max_tsblock_line_number=1000
 # Datatype: long
 slow_query_threshold=10000
 
+# Time window threshold(min) for record of history queries.
+# effectiveMode: hot_reload
+# Datatype: int
+query_cost_stat_window=0

Review Comment:
   ```suggestion
   # Privilege: SYSTEM
   query_cost_stat_window=0
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public 
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
   public ExecutorService getDispatchExecutor() {
     return dispatchExecutor;
   }
+
+  /** record query info in memory data structure */
+  public void recordCurrentQueries(
+      String queryId,
+      long startTime,
+      long endTime,
+      long costTimeInNs,
+      String statement,
+      String user,
+      String clientHost) {
+    if (CONFIG.getQueryCostStatWindow() <= 0) {
+      return;
+    }
+
+    if (queryId == null) {
+      // fast Last query API executeFastLastDataQueryForOnePrefixPath will 
enter this
+      queryId = queryIdGenerator.createNextQueryId().getId();
+    }
+
+    // ns -> s
+    float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+    QueryInfo queryInfo =
+        new QueryInfo(queryId, startTime, endTime, costTimeInSeconds, 
statement, user, clientHost);
+
+    while 
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+      // try to release memory from the head of queue
+      QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+      if (queryInfoToRelease == null) {
+        // no element in the queue and the memory is still not enough, skip 
this record
+        return;
+      } else {
+        // release memory and unrecord in histogram
+        
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+        unrecordInHistogram(queryInfoToRelease.costTime);
+      }
+    }
+
+    currentQueriesInfo.addLast(queryInfo);
+    recordInHistogram(costTimeInSeconds);
+  }
+
+  private void recordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndIncrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndIncrement();
+    }
+  }
+
+  private void unrecordInHistogram(float costTimeInSeconds) {
+    int bucket = (int) costTimeInSeconds;
+    if (bucket < 60) {
+      currentQueriesCostHistogram[bucket].getAndDecrement();
+    } else {
+      currentQueriesCostHistogram[60].getAndDecrement();
+    }
+  }
+
+  private void clearExpiredQueriesInfoTask() {
+    int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+    if (queryCostStatWindow <= 0) {
+      return;
+    }
+
+    // the QueryInfo smaller than expired time will be cleared
+    long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 * 
1_000;
+    // peek head, the head QueryInfo is in the time window, return directly
+    QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+    if (queryInfo.endTime >= expiredTime) {

Review Comment:
   ```suggestion
       if (queryInfo == null || queryInfo.endTime >= expiredTime) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to