JackieTien97 commented on code in PR #16890:
URL: https://github.com/apache/iotdb/pull/16890#discussion_r2605367832
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java:
##########
@@ -67,6 +67,15 @@ public String getId() {
return id;
}
+ public int getDataNodeId() {
+ return getDataNodeId(id);
+ }
+
+ public static int getDataNodeId(String queryId) {
+ String[] splits = queryId.split("_");
+ return Integer.parseInt(splits[splits.length - 1]);
+ }
+
Review Comment:
```suggestion
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -203,12 +216,28 @@ public class Coordinator {
private final ConcurrentHashMap<Long, IQueryExecution> queryExecutionMap;
+ private final BlockingDeque<QueryInfo> currentQueriesInfo = new
LinkedBlockingDeque<>();
+ private final AtomicInteger[] currentQueriesCostHistogram = new
AtomicInteger[61];
+ private final ScheduledExecutorService retryFailTasksExecutor =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.EXPIRED_QUERIES_INFO_CLEAR.getName());
+
private final StatementRewrite statementRewrite;
private final List<PlanOptimizer> logicalPlanOptimizers;
private final List<PlanOptimizer> distributionPlanOptimizers;
private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier
dataNodeLocationSupplier;
private final TypeManager typeManager;
+ {
+ for (int i = 0; i < 61; i++) {
+ currentQueriesCostHistogram[i] = new AtomicInteger();
+ }
+
+ ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+ retryFailTasksExecutor, this::clearExpiredQueriesInfoTask, 5, 5,
TimeUnit.MILLISECONDS);
Review Comment:
1 second check frequency is enough.
```suggestion
retryFailTasksExecutor, this::clearExpiredQueriesInfoTask, 1_000,
1_000, TimeUnit.MILLISECONDS);
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1181,4 +1183,140 @@ public boolean hasNext() {
return sessionConnectionIterator.hasNext();
}
}
+
+ private static class CurrentQueriesSupplier extends TsBlockSupplier {
+ protected int nextConsumedIndex;
+ private List<Coordinator.StatedQueriesInfo> queriesInfo;
+
+ private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final
UserEntity userEntity) {
+ super(dataTypes);
+ queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
+ try {
+ accessControl.checkUserGlobalSysPrivilege(userEntity);
+ } catch (final AccessDeniedException e) {
+ queriesInfo =
+ queriesInfo.stream()
+ .filter(iQueryInfo ->
userEntity.getUsername().equals(iQueryInfo.getUser()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ protected void constructLine() {
+ final Coordinator.StatedQueriesInfo queryInfo =
queriesInfo.get(nextConsumedIndex);
+
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryInfo.getQueryId()));
+
columnBuilders[1].writeBinary(BytesUtils.valueOf(queryInfo.getQueryState()));
+ columnBuilders[2].writeLong(
+ TimestampPrecisionUtils.convertToCurrPrecision(
+ queryInfo.getStartTime(), TimeUnit.MILLISECONDS));
+ if (queryInfo.getEndTime() == Coordinator.QueryInfo.DEFAULT_END_TIME) {
+ columnBuilders[3].appendNull();
+ } else {
+ columnBuilders[3].writeLong(
+ TimestampPrecisionUtils.convertToCurrPrecision(
+ queryInfo.getEndTime(), TimeUnit.MILLISECONDS));
+ }
+
columnBuilders[4].writeInt(QueryId.getDataNodeId(queryInfo.getQueryId()));
+ columnBuilders[5].writeFloat(queryInfo.getCostTime());
+
columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement()));
+ columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser()));
+
columnBuilders[8].writeBinary(BytesUtils.valueOf(queryInfo.getClientHost()));
+ resultBuilder.declarePosition();
+ nextConsumedIndex++;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextConsumedIndex < queriesInfo.size();
+ }
+ }
+
+ private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
+ protected int nextConsumedIndex;
+ private static final Binary[] BUCKETS =
+ new Binary[] {
+ BytesUtils.valueOf("[0,1)"),
+ BytesUtils.valueOf("[1,2)"),
+ BytesUtils.valueOf("[2,3)"),
+ BytesUtils.valueOf("[3,4)"),
+ BytesUtils.valueOf("[4,5)"),
+ BytesUtils.valueOf("[5,6)"),
+ BytesUtils.valueOf("[6,7)"),
+ BytesUtils.valueOf("[7,8)"),
+ BytesUtils.valueOf("[8,9)"),
+ BytesUtils.valueOf("[9,10)"),
+ BytesUtils.valueOf("[10,11)"),
+ BytesUtils.valueOf("[11,12)"),
+ BytesUtils.valueOf("[12,13)"),
+ BytesUtils.valueOf("[13,14)"),
+ BytesUtils.valueOf("[14,15)"),
+ BytesUtils.valueOf("[15,16)"),
+ BytesUtils.valueOf("[16,17)"),
+ BytesUtils.valueOf("[17,18)"),
+ BytesUtils.valueOf("[18,19)"),
+ BytesUtils.valueOf("[19,20)"),
+ BytesUtils.valueOf("[20,21)"),
+ BytesUtils.valueOf("[21,22)"),
+ BytesUtils.valueOf("[22,23)"),
+ BytesUtils.valueOf("[23,24)"),
+ BytesUtils.valueOf("[24,25)"),
+ BytesUtils.valueOf("[25,26)"),
+ BytesUtils.valueOf("[26,27)"),
+ BytesUtils.valueOf("[27,28)"),
+ BytesUtils.valueOf("[28,29)"),
+ BytesUtils.valueOf("[29,30)"),
+ BytesUtils.valueOf("[30,31)"),
+ BytesUtils.valueOf("[31,32)"),
+ BytesUtils.valueOf("[32,33)"),
+ BytesUtils.valueOf("[33,34)"),
+ BytesUtils.valueOf("[34,35)"),
+ BytesUtils.valueOf("[35,36)"),
+ BytesUtils.valueOf("[36,37)"),
+ BytesUtils.valueOf("[37,38)"),
+ BytesUtils.valueOf("[38,39)"),
+ BytesUtils.valueOf("[39,40)"),
+ BytesUtils.valueOf("[40,41)"),
+ BytesUtils.valueOf("[41,42)"),
+ BytesUtils.valueOf("[42,43)"),
+ BytesUtils.valueOf("[43,44)"),
+ BytesUtils.valueOf("[44,45)"),
+ BytesUtils.valueOf("[45,46)"),
+ BytesUtils.valueOf("[46,47)"),
+ BytesUtils.valueOf("[47,48)"),
+ BytesUtils.valueOf("[48,49)"),
+ BytesUtils.valueOf("[49,50)"),
+ BytesUtils.valueOf("[50,51)"),
+ BytesUtils.valueOf("[51,52)"),
+ BytesUtils.valueOf("[52,53)"),
+ BytesUtils.valueOf("[53,54)"),
+ BytesUtils.valueOf("[54,55)"),
+ BytesUtils.valueOf("[55,56)"),
+ BytesUtils.valueOf("[56,57)"),
+ BytesUtils.valueOf("[57,58)"),
+ BytesUtils.valueOf("[58,59)"),
+ BytesUtils.valueOf("[59,60)"),
+ BytesUtils.valueOf("60+")
+ };
+ private int[] currentQueriesCostHistogram;
Review Comment:
```suggestion
private final int[] currentQueriesCostHistogram;
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/QueryId.java:
##########
@@ -67,6 +67,15 @@ public String getId() {
return id;
}
+ public int getDataNodeId() {
+ return getDataNodeId(id);
Review Comment:
init a static field datanode id =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), then directly return
that
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1181,4 +1183,140 @@ public boolean hasNext() {
return sessionConnectionIterator.hasNext();
}
}
+
+ private static class CurrentQueriesSupplier extends TsBlockSupplier {
+ protected int nextConsumedIndex;
+ private List<Coordinator.StatedQueriesInfo> queriesInfo;
+
+ private CurrentQueriesSupplier(final List<TSDataType> dataTypes, final
UserEntity userEntity) {
+ super(dataTypes);
+ queriesInfo = Coordinator.getInstance().getCurrentQueriesInfo();
+ try {
+ accessControl.checkUserGlobalSysPrivilege(userEntity);
+ } catch (final AccessDeniedException e) {
+ queriesInfo =
+ queriesInfo.stream()
+ .filter(iQueryInfo ->
userEntity.getUsername().equals(iQueryInfo.getUser()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ @Override
+ protected void constructLine() {
+ final Coordinator.StatedQueriesInfo queryInfo =
queriesInfo.get(nextConsumedIndex);
+
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryInfo.getQueryId()));
+
columnBuilders[1].writeBinary(BytesUtils.valueOf(queryInfo.getQueryState()));
+ columnBuilders[2].writeLong(
+ TimestampPrecisionUtils.convertToCurrPrecision(
+ queryInfo.getStartTime(), TimeUnit.MILLISECONDS));
+ if (queryInfo.getEndTime() == Coordinator.QueryInfo.DEFAULT_END_TIME) {
+ columnBuilders[3].appendNull();
+ } else {
+ columnBuilders[3].writeLong(
+ TimestampPrecisionUtils.convertToCurrPrecision(
+ queryInfo.getEndTime(), TimeUnit.MILLISECONDS));
+ }
+
columnBuilders[4].writeInt(QueryId.getDataNodeId(queryInfo.getQueryId()));
+ columnBuilders[5].writeFloat(queryInfo.getCostTime());
+
columnBuilders[6].writeBinary(BytesUtils.valueOf(queryInfo.getStatement()));
+ columnBuilders[7].writeBinary(BytesUtils.valueOf(queryInfo.getUser()));
+
columnBuilders[8].writeBinary(BytesUtils.valueOf(queryInfo.getClientHost()));
+ resultBuilder.declarePosition();
+ nextConsumedIndex++;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return nextConsumedIndex < queriesInfo.size();
+ }
+ }
+
+ private static class QueriesCostsHistogramSupplier extends TsBlockSupplier {
+ protected int nextConsumedIndex;
Review Comment:
```suggestion
private int nextConsumedIndex;
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
public ExecutorService getDispatchExecutor() {
return dispatchExecutor;
}
+
+ /** record query info in memory data structure */
+ public void recordCurrentQueries(
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTimeInNs,
+ String statement,
+ String user,
+ String clientHost) {
+ if (CONFIG.getQueryCostStatWindow() <= 0) {
+ return;
+ }
+
+ if (queryId == null) {
+ // fast Last query API executeFastLastDataQueryForOnePrefixPath will
enter this
+ queryId = queryIdGenerator.createNextQueryId().getId();
+ }
+
+ // ns -> s
+ float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+ QueryInfo queryInfo =
+ new QueryInfo(queryId, startTime, endTime, costTimeInSeconds,
statement, user, clientHost);
+
+ while
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+ // try to release memory from the head of queue
+ QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+ if (queryInfoToRelease == null) {
+ // no element in the queue and the memory is still not enough, skip
this record
+ return;
+ } else {
+ // release memory and unrecord in histogram
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+ unrecordInHistogram(queryInfoToRelease.costTime);
+ }
+ }
+
+ currentQueriesInfo.addLast(queryInfo);
+ recordInHistogram(costTimeInSeconds);
+ }
+
+ private void recordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndIncrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndIncrement();
+ }
+ }
+
+ private void unrecordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndDecrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndDecrement();
+ }
+ }
+
+ private void clearExpiredQueriesInfoTask() {
+ int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+ if (queryCostStatWindow <= 0) {
+ return;
+ }
+
+ // the QueryInfo smaller than expired time will be cleared
+ long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 *
1_000;
+ // peek head, the head QueryInfo is in the time window, return directly
+ QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+ if (queryInfo.endTime >= expiredTime) {
+ return;
+ }
+
+ queryInfo = currentQueriesInfo.poll();
+ while (queryInfo != null) {
+ if (queryInfo.endTime < expiredTime) {
+ // out of time window, clear queryInfo
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo));
+ unrecordInHistogram(queryInfo.costTime);
+ queryInfo = currentQueriesInfo.poll();
+ } else {
+ // the head of the queue is not expired, add back
+ currentQueriesInfo.addFirst(queryInfo);
+ // there is no more candidate to clear
+ return;
+ }
+ }
+ }
+
+ public List<StatedQueriesInfo> getCurrentQueriesInfo() {
+ List<IQueryExecution> runningQueries = getAllQueryExecutions();
+ Set<String> runningQueryIdSet =
+
runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet());
+ List<StatedQueriesInfo> result = new ArrayList<>();
+
+ // add History queries (satisfy the time window) info
+ Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
+ Set<String> repetitionQueryIdSet = new HashSet<>();
+ long currentTime = System.currentTimeMillis();
+ long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 *
1_000;
Review Comment:
```suggestion
long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60
* 1_000L;
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -201,14 +206,11 @@ protected void constructLine() {
final IQueryExecution queryExecution =
queryExecutions.get(nextConsumedIndex);
if
(queryExecution.getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) {
- final String[] splits = queryExecution.getQueryId().split("_");
- final int dataNodeId = Integer.parseInt(splits[splits.length - 1]);
-
columnBuilders[0].writeBinary(BytesUtils.valueOf(queryExecution.getQueryId()));
columnBuilders[1].writeLong(
TimestampPrecisionUtils.convertToCurrPrecision(
queryExecution.getStartExecutionTime(),
TimeUnit.MILLISECONDS));
- columnBuilders[2].writeInt(dataNodeId);
+
columnBuilders[2].writeInt(QueryId.getDataNodeId(queryExecution.getQueryId()));
Review Comment:
use the static field
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -625,8 +654,20 @@ public void cleanupQueryExecution(
try (SetThreadName threadName = new
SetThreadName(queryExecution.getQueryId())) {
LOGGER.debug("[CleanUpQuery]]");
queryExecution.stopAndCleanup(t);
+ boolean isUserQuery = queryExecution.isQuery() &&
queryExecution.isUserQuery();
+ if (isUserQuery) {
+ recordCurrentQueries(
+ queryExecution.getQueryId(),
+ queryExecution.getStartExecutionTime(),
+ queryExecution.getStartExecutionTime()
+ + queryExecution.getTotalExecutionTime() / 1_000_000L,
Review Comment:
```suggestion
System.currentTimeMillis(),
```
endTime should be current time, endTime-startTime don't need to be equal to
getTotalExecutionTime, `getTotalExecutionTime` is all rpc latentcy sum.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
public ExecutorService getDispatchExecutor() {
return dispatchExecutor;
}
+
+ /** record query info in memory data structure */
+ public void recordCurrentQueries(
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTimeInNs,
+ String statement,
+ String user,
+ String clientHost) {
+ if (CONFIG.getQueryCostStatWindow() <= 0) {
+ return;
+ }
+
+ if (queryId == null) {
+ // fast Last query API executeFastLastDataQueryForOnePrefixPath will
enter this
+ queryId = queryIdGenerator.createNextQueryId().getId();
+ }
+
+ // ns -> s
+ float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+ QueryInfo queryInfo =
+ new QueryInfo(queryId, startTime, endTime, costTimeInSeconds,
statement, user, clientHost);
+
+ while
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+ // try to release memory from the head of queue
+ QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+ if (queryInfoToRelease == null) {
+ // no element in the queue and the memory is still not enough, skip
this record
+ return;
+ } else {
+ // release memory and unrecord in histogram
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+ unrecordInHistogram(queryInfoToRelease.costTime);
+ }
+ }
+
+ currentQueriesInfo.addLast(queryInfo);
+ recordInHistogram(costTimeInSeconds);
+ }
+
+ private void recordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndIncrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndIncrement();
+ }
+ }
+
+ private void unrecordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndDecrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndDecrement();
+ }
+ }
+
+ private void clearExpiredQueriesInfoTask() {
+ int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+ if (queryCostStatWindow <= 0) {
+ return;
+ }
+
+ // the QueryInfo smaller than expired time will be cleared
+ long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 *
1_000;
+ // peek head, the head QueryInfo is in the time window, return directly
+ QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+ if (queryInfo.endTime >= expiredTime) {
+ return;
+ }
+
+ queryInfo = currentQueriesInfo.poll();
+ while (queryInfo != null) {
+ if (queryInfo.endTime < expiredTime) {
+ // out of time window, clear queryInfo
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo));
+ unrecordInHistogram(queryInfo.costTime);
+ queryInfo = currentQueriesInfo.poll();
+ } else {
+ // the head of the queue is not expired, add back
+ currentQueriesInfo.addFirst(queryInfo);
+ // there is no more candidate to clear
+ return;
+ }
+ }
+ }
+
+ public List<StatedQueriesInfo> getCurrentQueriesInfo() {
+ List<IQueryExecution> runningQueries = getAllQueryExecutions();
+ Set<String> runningQueryIdSet =
+
runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet());
+ List<StatedQueriesInfo> result = new ArrayList<>();
+
+ // add History queries (satisfy the time window) info
+ Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
+ Set<String> repetitionQueryIdSet = new HashSet<>();
+ long currentTime = System.currentTimeMillis();
+ long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 *
1_000;
+ while (historyQueriesIterator.hasNext()) {
+ QueryInfo queryInfo = historyQueriesIterator.next();
+ if (queryInfo.endTime < needRecordTime) {
+ // out of time window, ignore it
+ } else {
+ if (runningQueryIdSet.contains(queryInfo.queryId)) {
+ repetitionQueryIdSet.add(queryInfo.queryId);
+ }
+ result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo));
+ }
+ }
+
+ // add Running queries info after remove the repetitions which has
recorded in History queries
+ result.addAll(
+ runningQueries.stream()
+ .filter(queryExecution ->
!repetitionQueryIdSet.contains(queryExecution.getQueryId()))
+ .map(
+ queryExecution ->
+ new StatedQueriesInfo(
+ QueryState.RUNNING,
+ queryExecution.getQueryId(),
+ queryExecution.getStartExecutionTime(),
+ DEFAULT_END_TIME,
+ (currentTime - queryExecution.getStartExecutionTime())
/ 1000,
+ queryExecution.getExecuteSQL().orElse("UNKNOWN"),
+ queryExecution.getUser(),
+ queryExecution.getClientHostname()))
+ .collect(Collectors.toList()));
+ return result;
+ }
+
+ public int[] getCurrentQueriesCostHistogram() {
+ return
Arrays.stream(currentQueriesCostHistogram).mapToInt(AtomicInteger::get).toArray();
+ }
+
+ public static class QueryInfo {
Review Comment:
implements Accountable
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
public ExecutorService getDispatchExecutor() {
return dispatchExecutor;
}
+
+ /** record query info in memory data structure */
+ public void recordCurrentQueries(
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTimeInNs,
+ String statement,
+ String user,
+ String clientHost) {
+ if (CONFIG.getQueryCostStatWindow() <= 0) {
+ return;
+ }
+
+ if (queryId == null) {
+ // fast Last query API executeFastLastDataQueryForOnePrefixPath will
enter this
+ queryId = queryIdGenerator.createNextQueryId().getId();
+ }
+
+ // ns -> s
+ float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+ QueryInfo queryInfo =
+ new QueryInfo(queryId, startTime, endTime, costTimeInSeconds,
statement, user, clientHost);
+
+ while
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+ // try to release memory from the head of queue
+ QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+ if (queryInfoToRelease == null) {
+ // no element in the queue and the memory is still not enough, skip
this record
+ return;
+ } else {
+ // release memory and unrecord in histogram
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+ unrecordInHistogram(queryInfoToRelease.costTime);
+ }
+ }
+
+ currentQueriesInfo.addLast(queryInfo);
+ recordInHistogram(costTimeInSeconds);
+ }
+
+ private void recordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndIncrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndIncrement();
+ }
+ }
+
+ private void unrecordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndDecrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndDecrement();
+ }
+ }
+
+ private void clearExpiredQueriesInfoTask() {
+ int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+ if (queryCostStatWindow <= 0) {
+ return;
+ }
+
+ // the QueryInfo smaller than expired time will be cleared
+ long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 *
1_000;
+ // peek head, the head QueryInfo is in the time window, return directly
+ QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+ if (queryInfo.endTime >= expiredTime) {
+ return;
+ }
+
+ queryInfo = currentQueriesInfo.poll();
+ while (queryInfo != null) {
+ if (queryInfo.endTime < expiredTime) {
+ // out of time window, clear queryInfo
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfo));
+ unrecordInHistogram(queryInfo.costTime);
+ queryInfo = currentQueriesInfo.poll();
+ } else {
+ // the head of the queue is not expired, add back
+ currentQueriesInfo.addFirst(queryInfo);
+ // there is no more candidate to clear
+ return;
+ }
+ }
+ }
+
+ public List<StatedQueriesInfo> getCurrentQueriesInfo() {
+ List<IQueryExecution> runningQueries = getAllQueryExecutions();
+ Set<String> runningQueryIdSet =
+
runningQueries.stream().map(IQueryExecution::getQueryId).collect(Collectors.toSet());
+ List<StatedQueriesInfo> result = new ArrayList<>();
+
+ // add History queries (satisfy the time window) info
+ Iterator<QueryInfo> historyQueriesIterator = currentQueriesInfo.iterator();
+ Set<String> repetitionQueryIdSet = new HashSet<>();
+ long currentTime = System.currentTimeMillis();
+ long needRecordTime = currentTime - CONFIG.getQueryCostStatWindow() * 60 *
1_000;
+ while (historyQueriesIterator.hasNext()) {
+ QueryInfo queryInfo = historyQueriesIterator.next();
+ if (queryInfo.endTime < needRecordTime) {
+ // out of time window, ignore it
+ } else {
+ if (runningQueryIdSet.contains(queryInfo.queryId)) {
+ repetitionQueryIdSet.add(queryInfo.queryId);
+ }
+ result.add(new StatedQueriesInfo(QueryState.FINISHED, queryInfo));
+ }
+ }
+
+ // add Running queries info after remove the repetitions which has
recorded in History queries
+ result.addAll(
+ runningQueries.stream()
+ .filter(queryExecution ->
!repetitionQueryIdSet.contains(queryExecution.getQueryId()))
+ .map(
+ queryExecution ->
+ new StatedQueriesInfo(
+ QueryState.RUNNING,
+ queryExecution.getQueryId(),
+ queryExecution.getStartExecutionTime(),
+ DEFAULT_END_TIME,
+ (currentTime - queryExecution.getStartExecutionTime())
/ 1000,
+ queryExecution.getExecuteSQL().orElse("UNKNOWN"),
+ queryExecution.getUser(),
+ queryExecution.getClientHostname()))
+ .collect(Collectors.toList()));
+ return result;
+ }
+
+ public int[] getCurrentQueriesCostHistogram() {
+ return
Arrays.stream(currentQueriesCostHistogram).mapToInt(AtomicInteger::get).toArray();
+ }
+
+ public static class QueryInfo {
Review Comment:
private static final long INSTANCE_SIZE =
shallowSizeOfInstance(QueryInfo.class);
then calculate statement, user, clientHost each time, refer to
`TimeSeriesMetadataCacheKey`, string size can be calculated by
sizeOfCharArray(XXX.length());
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -625,8 +654,20 @@ public void cleanupQueryExecution(
try (SetThreadName threadName = new
SetThreadName(queryExecution.getQueryId())) {
LOGGER.debug("[CleanUpQuery]]");
queryExecution.stopAndCleanup(t);
+ boolean isUserQuery = queryExecution.isQuery() &&
queryExecution.isUserQuery();
+ if (isUserQuery) {
+ recordCurrentQueries(
+ queryExecution.getQueryId(),
+ queryExecution.getStartExecutionTime(),
+ queryExecution.getStartExecutionTime()
+ + queryExecution.getTotalExecutionTime() / 1_000_000L,
+ queryExecution.getTotalExecutionTime(),
+ queryExecution.getExecuteSQL().orElse("UNKNOWN"),
Review Comment:
better to use `ContentOfQuerySupplier` which is the same as slow query
recorder.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
public ExecutorService getDispatchExecutor() {
return dispatchExecutor;
}
+
+ /** record query info in memory data structure */
+ public void recordCurrentQueries(
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTimeInNs,
+ String statement,
+ String user,
+ String clientHost) {
+ if (CONFIG.getQueryCostStatWindow() <= 0) {
+ return;
+ }
+
+ if (queryId == null) {
+ // fast Last query API executeFastLastDataQueryForOnePrefixPath will
enter this
+ queryId = queryIdGenerator.createNextQueryId().getId();
+ }
+
+ // ns -> s
+ float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+ QueryInfo queryInfo =
+ new QueryInfo(queryId, startTime, endTime, costTimeInSeconds,
statement, user, clientHost);
+
+ while
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+ // try to release memory from the head of queue
+ QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+ if (queryInfoToRelease == null) {
+ // no element in the queue and the memory is still not enough, skip
this record
+ return;
+ } else {
+ // release memory and unrecord in histogram
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+ unrecordInHistogram(queryInfoToRelease.costTime);
+ }
+ }
+
+ currentQueriesInfo.addLast(queryInfo);
+ recordInHistogram(costTimeInSeconds);
+ }
+
+ private void recordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndIncrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndIncrement();
+ }
+ }
+
+ private void unrecordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndDecrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndDecrement();
+ }
+ }
+
+ private void clearExpiredQueriesInfoTask() {
+ int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+ if (queryCostStatWindow <= 0) {
+ return;
+ }
+
+ // the QueryInfo smaller than expired time will be cleared
+ long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 *
1_000;
Review Comment:
```suggestion
long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60
* 1_000L;
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
public ExecutorService getDispatchExecutor() {
return dispatchExecutor;
}
+
+ /** record query info in memory data structure */
+ public void recordCurrentQueries(
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTimeInNs,
+ String statement,
Review Comment:
```suggestion
Supplier<String> contentOfQuerySupplier,
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java:
##########
@@ -1181,4 +1183,140 @@ public boolean hasNext() {
return sessionConnectionIterator.hasNext();
}
}
+
+ private static class CurrentQueriesSupplier extends TsBlockSupplier {
+ protected int nextConsumedIndex;
Review Comment:
```suggestion
private int nextConsumedIndex;
```
##########
iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template:
##########
@@ -1080,6 +1080,11 @@ max_tsblock_line_number=1000
# Datatype: long
slow_query_threshold=10000
+# Time window threshold(min) for record of history queries.
+# effectiveMode: hot_reload
+# Datatype: int
+query_cost_stat_window=0
Review Comment:
```suggestion
# Privilege: SYSTEM
query_cost_stat_window=0
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java:
##########
@@ -722,4 +763,233 @@ public
DataNodeLocationSupplierFactory.DataNodeLocationSupplier getDataNodeLocat
public ExecutorService getDispatchExecutor() {
return dispatchExecutor;
}
+
+ /** record query info in memory data structure */
+ public void recordCurrentQueries(
+ String queryId,
+ long startTime,
+ long endTime,
+ long costTimeInNs,
+ String statement,
+ String user,
+ String clientHost) {
+ if (CONFIG.getQueryCostStatWindow() <= 0) {
+ return;
+ }
+
+ if (queryId == null) {
+ // fast Last query API executeFastLastDataQueryForOnePrefixPath will
enter this
+ queryId = queryIdGenerator.createNextQueryId().getId();
+ }
+
+ // ns -> s
+ float costTimeInSeconds = costTimeInNs * 1e-9f;
+
+ QueryInfo queryInfo =
+ new QueryInfo(queryId, startTime, endTime, costTimeInSeconds,
statement, user, clientHost);
+
+ while
(!coordinatorMemoryBlock.allocate(RamUsageEstimator.sizeOfObject(queryInfo))) {
+ // try to release memory from the head of queue
+ QueryInfo queryInfoToRelease = currentQueriesInfo.poll();
+ if (queryInfoToRelease == null) {
+ // no element in the queue and the memory is still not enough, skip
this record
+ return;
+ } else {
+ // release memory and unrecord in histogram
+
coordinatorMemoryBlock.release(RamUsageEstimator.sizeOfObject(queryInfoToRelease));
+ unrecordInHistogram(queryInfoToRelease.costTime);
+ }
+ }
+
+ currentQueriesInfo.addLast(queryInfo);
+ recordInHistogram(costTimeInSeconds);
+ }
+
+ private void recordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndIncrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndIncrement();
+ }
+ }
+
+ private void unrecordInHistogram(float costTimeInSeconds) {
+ int bucket = (int) costTimeInSeconds;
+ if (bucket < 60) {
+ currentQueriesCostHistogram[bucket].getAndDecrement();
+ } else {
+ currentQueriesCostHistogram[60].getAndDecrement();
+ }
+ }
+
+ private void clearExpiredQueriesInfoTask() {
+ int queryCostStatWindow = CONFIG.getQueryCostStatWindow();
+ if (queryCostStatWindow <= 0) {
+ return;
+ }
+
+ // the QueryInfo smaller than expired time will be cleared
+ long expiredTime = System.currentTimeMillis() - queryCostStatWindow * 60 *
1_000;
+ // peek head, the head QueryInfo is in the time window, return directly
+ QueryInfo queryInfo = currentQueriesInfo.peekFirst();
+ if (queryInfo.endTime >= expiredTime) {
Review Comment:
```suggestion
if (queryInfo == null || queryInfo.endTime >= expiredTime) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]