Copilot commented on code in PR #13640:
URL: https://github.com/apache/iotdb/pull/13640#discussion_r2188811440
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java:
##########
@@ -89,49 +128,91 @@ long getRemainingEvents() {
final PipeRemainingTimeRateAverageTime
pipeRemainingTimeCommitRateAverageTime =
PipeConfig.getInstance().getPipeRemainingTimeCommitRateAverageTime();
- final double invocationValue = collectInvocationHistogram.getMean();
- // Do not take heartbeat event into account
- final double totalDataRegionWriteEventCount =
- (dataRegionExtractors.stream()
- .map(IoTDBDataRegionExtractor::getEventCount)
- .reduce(Integer::sum)
- .orElse(0)
- - dataRegionExtractors.stream()
-
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
- .reduce(Integer::sum)
- .orElse(0))
- * Math.max(invocationValue, 1)
+ // data region historical event
+ final double totalDataRegionWriteHistoricalEventCount =
+ dataRegionExtractors.stream()
+
.map(IoTDBDataRegionExtractor::getHistoricalTsFileInsertionEventCount)
+ .reduce(Integer::sum)
+ .orElse(0)
+ dataRegionProcessors.stream()
- .map(processorSubtask -> processorSubtask.getEventCount(true))
+ .map(
+ processor ->
+ processor.getEventCount(
+ filter(pipeName, NEED_TO_COMMIT_RATE,
IS_DATA_REGION_HISTORICAL_EVENT)))
.reduce(Integer::sum)
.orElse(0)
+ dataRegionConnectors.stream()
- .map(connectorSubtask ->
connectorSubtask.getEventCount(pipeName))
+ .map(
+ connector ->
+ connector.getEventCount(
+ filter(pipeName, NEED_TO_COMMIT_RATE,
IS_DATA_REGION_HISTORICAL_EVENT)))
+ .reduce(Integer::sum)
+ .orElse(0);
+
+ dataRegionHistoricalEventCommitMeter.updateAndGet(
+ meter -> {
+ if (Objects.nonNull(meter)) {
+ lastDataRegionHistoricalEventCommitSmoothingValue =
+ pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
+ }
+ return meter;
+ });
+
+ final double dataRegionHistoricalEventRemainingTime;
+ if (totalDataRegionWriteHistoricalEventCount <= 0) {
+ dataRegionHistoricalEventRemainingTime = 0;
+ } else {
+ dataRegionHistoricalEventRemainingTime =
+ lastDataRegionHistoricalEventCommitSmoothingValue <= 0
+ ? Double.MAX_VALUE
+ : totalDataRegionWriteHistoricalEventCount
+ / lastDataRegionHistoricalEventCommitSmoothingValue;
+ }
+
+ // data region realtime event
+ final double totalDataRegionWriteRealtimeEventCount =
+ dataRegionExtractors.stream()
+ .map(
+ extractor ->
+ extractor.getRealtimeEventCount(filter(pipeName,
NEED_TO_COMMIT_RATE)))
.reduce(Integer::sum)
.orElse(0)
- - dataRegionConnectors.stream()
- .map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
+ + dataRegionProcessors.stream()
+ .map(
+ processor ->
+ processor.getEventCount(
+ filter(pipeName, NEED_TO_COMMIT_RATE,
IS_DATA_REGION_REALTIME_EVENT)))
+ .reduce(Integer::sum)
+ .orElse(0)
+ + dataRegionConnectors.stream()
+ .map(
+ connector ->
+ connector.getEventCount(
+ filter(pipeName, NEED_TO_COMMIT_RATE,
IS_DATA_REGION_REALTIME_EVENT)))
.reduce(Integer::sum)
.orElse(0);
- dataRegionCommitMeter.updateAndGet(
+ dataRegionRealtimeEventCommitMeter.updateAndGet(
meter -> {
if (Objects.nonNull(meter)) {
- lastDataRegionCommitSmoothingValue =
+ lastDataRegionRealtimeEventCommitSmoothingValue =
pipeRemainingTimeCommitRateAverageTime.getMeterRate(meter);
}
return meter;
});
- final double dataRegionRemainingTime;
- if (totalDataRegionWriteEventCount <= 0) {
- dataRegionRemainingTime = 0;
+
+ final double dataRegionRealtimeEventRemainingTime;
+ if (totalDataRegionWriteRealtimeEventCount <= 0) {
+ dataRegionRealtimeEventRemainingTime = 0;
} else {
- dataRegionRemainingTime =
- lastDataRegionCommitSmoothingValue <= 0
- ? Double.MAX_VALUE
- : totalDataRegionWriteEventCount /
lastDataRegionCommitSmoothingValue;
+ dataRegionRealtimeEventRemainingTime =
+ lastDataRegionRealtimeEventCommitSmoothingValue <= 0
+ ? 0 // NOTE HERE
Review Comment:
Returning 0 when the realtime commit‐rate smoothing value is ≤ 0
underestimates remaining time. For symmetry with historical events, consider
returning Double.MAX_VALUE to indicate unbounded remaining time.
```suggestion
? Double.MAX_VALUE // Indicate unbounded remaining time
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]