Copilot commented on code in PR #3909: URL: https://github.com/apache/hertzbeat/pull/3909#discussion_r2613634223
########## hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/controller/AnalysisController.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.analysis.algorithm.PredictionResult; +import org.apache.hertzbeat.analysis.service.AnalysisService; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.dto.Message; +import org.apache.hertzbeat.common.entity.dto.MetricsHistoryData; +import org.apache.hertzbeat.common.entity.dto.Value; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.manager.Monitor; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.apache.hertzbeat.manager.service.AppService; +import org.apache.hertzbeat.manager.service.MonitorService; +import org.apache.hertzbeat.warehouse.service.MetricsDataService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Analysis and Prediction Controller + */ +@Tag(name = "Analysis Prediction API") +@RestController +@RequestMapping(path = "/api/analysis") +@Slf4j +public class AnalysisController { + + @Autowired + private MetricsDataService metricsDataService; + + @Autowired + private AnalysisService analysisService; + + @Autowired + private AppService appService; + + @Autowired + private MonitorService monitorService; + + @GetMapping("/predict/{instance}/{app}/{metrics}/{metric}") + @Operation(summary = "Predict metric data", description = "Forecast future metric data based on history") + public Message<Map<String, List<PredictionResult>>> getMetricPrediction( + @Parameter(description = "Monitor Instance", example = "127.0.0.1") @PathVariable String instance, + @Parameter(description = "App Type", example = "linux") @PathVariable String app, + @Parameter(description = "Metrics Name", example = "cpu") @PathVariable String metrics, + @Parameter(description = "Metric Name", example = "usage") @PathVariable String metric, + @Parameter(description = "History time range", example = "6h") @RequestParam(required = false) String history + ) { + // 1. Context Analysis + // We separate "User View Window" (history) from "Model Training Window" (dbQueryTime). + // User view: what the user sees (e.g., 1h). We should predict ~20% of this length. + // Training window: what the model needs (e.g., 3 days). We force this to be large. + + String userViewTime = history != null ? history : "6h"; + long userViewMillis = parseSimpleDuration(userViewTime); + if (userViewMillis <= 0) { + userViewMillis = 6 * 60 * 60 * 1000L; // default 6h + } + + // 2. Determine Training Window (Strategy: Max(3 days, 100 * interval)) + String dbQueryTime = "6h"; // initial fallback + try { + List<Monitor> monitors = monitorService.getAppMonitors(app); + if (monitors != null) { + Optional<Monitor> monitorOpt = monitors.stream() + .filter(m -> instance.equals(m.getInstance())) + .findFirst(); + + if (monitorOpt.isPresent()) { + Integer intervals = monitorOpt.get().getIntervals(); + if (intervals != null && intervals > 0) { + long minSeconds = 259200L; // 3 days + long intervalBasedSeconds = intervals * 100L; + long finalSeconds = Math.max(minSeconds, intervalBasedSeconds); + dbQueryTime = finalSeconds + "s"; + log.debug("[Predict] Training window calculated: {} for interval: {}s", dbQueryTime, intervals); + } + } + } + } catch (Exception e) { + log.warn("[Predict] Failed to calculate dynamic history for instance: {}, using default.", instance, e); + } + + // 3. Validate Metric Type + Optional<Job> jobOptional = appService.getAppDefineOption(app); + if (jobOptional.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Application definition not found: " + app); + } + Job job = jobOptional.get(); + + Optional<Metrics> metricsDefineOpt = job.getMetrics().stream() + .filter(m -> m.getName().equals(metrics)) + .findFirst(); + if (metricsDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metrics group not found: " + metrics); + } + + Optional<Metrics.Field> fieldDefineOpt = metricsDefineOpt.get().getFields().stream() + .filter(f -> f.getField().equals(metric)) + .findFirst(); + if (fieldDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metric field not found: " + metric); + } + + if (fieldDefineOpt.get().getType() != CommonConstants.TYPE_NUMBER) { + return Message.fail(CommonConstants.FAIL_CODE, "Prediction is only supported for numeric metrics."); + } + + // 4. Get Training Data (Using the Long Window) + MetricsHistoryData historyData = metricsDataService.getMetricHistoryData( + instance, app, metrics, metric, dbQueryTime, false); + + if (historyData == null || historyData.getValues() == null || historyData.getValues().isEmpty()) { + return Message.success(Collections.emptyMap()); + } + + Map<String, List<PredictionResult>> resultMap = new HashMap<>(); + + // Capture effectively final variable for lambda + final long viewWindowMillis = userViewMillis; + + // 5. Iterate and Forecast + historyData.getValues().forEach((rowInstance, values) -> { + if (values == null || values.size() < 10) { + return; + } + List<History> validHistory = new ArrayList<>(); + + for (Value v : values) { + try { + if (v.getOrigin() != null && !CommonConstants.NULL_VALUE.equals(v.getOrigin())) { + double val = Double.parseDouble(v.getOrigin()); + validHistory.add(History.builder() + .time(v.getTime()) + .dou(val) + .metricType(CommonConstants.TYPE_NUMBER) + .build()); + } + } catch (NumberFormatException ignored) {} + } + + if (validHistory.size() > 10) { + long step = estimateStep(validHistory); + + // Smart Calculation of Forecast Count + // Rule: Predict 1/5 of the user's current view window + long forecastDuration = viewWindowMillis / 5; + int dynamicCount = (int) (forecastDuration / step); + + // Bounds checking + if (dynamicCount < 5) dynamicCount = 5; // Minimum 5 points + if (dynamicCount > 2000) dynamicCount = 2000; // Safety cap + + log.debug("[Predict] View: {}ms, Forecast: {}ms ({} steps), Step: {}ms", + viewWindowMillis, forecastDuration, dynamicCount, step); + + List<PredictionResult> forecast = analysisService.forecast(validHistory, step, dynamicCount); + Review Comment: The hardcoded forecastCount parameter is not exposed through the API. The frontend sends a 'forecastCount' query parameter set to 10, but the controller calculates dynamicCount based on the view window, potentially ignoring the frontend's intent. Consider either removing the unused parameter from the frontend service call or exposing it as an optional API parameter to allow clients to override the calculated value when needed. ########## hertzbeat-analysis/src/main/java/org/apache/hertzbeat/analysis/algorithm/NLinearModel.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.analysis.algorithm; + +import java.util.Arrays; +import org.apache.commons.math3.linear.Array2DRowRealMatrix; +import org.apache.commons.math3.linear.ArrayRealVector; +import org.apache.commons.math3.linear.LUDecomposition; +import org.apache.commons.math3.linear.RealMatrix; +import org.apache.commons.math3.linear.RealVector; +import org.apache.commons.math3.linear.SingularMatrixException; +import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; + +/** + * Industrial-grade Robust NLinear Model. + * Uses Ridge Regression (L2 Regularization) to prevent overfitting and handle singular matrices. + * Note: This class is stateful and not thread-safe. A new instance should be created for each prediction task. + */ +public class NLinearModel { + + private static final int LOOKBACK_WINDOW = 30; + + /** + * Ridge regularization parameter (Lambda). + * A small positive value ensures the matrix is always invertible. + */ + private static final double RIDGE_LAMBDA = 0.01; Review Comment: The RIDGE_LAMBDA constant is hardcoded to 0.01. Similar to the lookback window, this regularization parameter could benefit from being configurable to allow tuning for different data characteristics (e.g., noisy vs. smooth data may need different regularization strengths). Consider making this configurable through constructor parameters or application properties. ########## hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/controller/AnalysisController.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.analysis.algorithm.PredictionResult; +import org.apache.hertzbeat.analysis.service.AnalysisService; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.dto.Message; +import org.apache.hertzbeat.common.entity.dto.MetricsHistoryData; +import org.apache.hertzbeat.common.entity.dto.Value; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.manager.Monitor; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.apache.hertzbeat.manager.service.AppService; +import org.apache.hertzbeat.manager.service.MonitorService; +import org.apache.hertzbeat.warehouse.service.MetricsDataService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Analysis and Prediction Controller + */ +@Tag(name = "Analysis Prediction API") +@RestController +@RequestMapping(path = "/api/analysis") +@Slf4j +public class AnalysisController { + + @Autowired + private MetricsDataService metricsDataService; + + @Autowired + private AnalysisService analysisService; + + @Autowired + private AppService appService; + + @Autowired + private MonitorService monitorService; + + @GetMapping("/predict/{instance}/{app}/{metrics}/{metric}") + @Operation(summary = "Predict metric data", description = "Forecast future metric data based on history") + public Message<Map<String, List<PredictionResult>>> getMetricPrediction( + @Parameter(description = "Monitor Instance", example = "127.0.0.1") @PathVariable String instance, + @Parameter(description = "App Type", example = "linux") @PathVariable String app, + @Parameter(description = "Metrics Name", example = "cpu") @PathVariable String metrics, + @Parameter(description = "Metric Name", example = "usage") @PathVariable String metric, + @Parameter(description = "History time range", example = "6h") @RequestParam(required = false) String history + ) { + // 1. Context Analysis + // We separate "User View Window" (history) from "Model Training Window" (dbQueryTime). + // User view: what the user sees (e.g., 1h). We should predict ~20% of this length. + // Training window: what the model needs (e.g., 3 days). We force this to be large. + + String userViewTime = history != null ? history : "6h"; + long userViewMillis = parseSimpleDuration(userViewTime); + if (userViewMillis <= 0) { + userViewMillis = 6 * 60 * 60 * 1000L; // default 6h + } + + // 2. Determine Training Window (Strategy: Max(3 days, 100 * interval)) + String dbQueryTime = "6h"; // initial fallback + try { + List<Monitor> monitors = monitorService.getAppMonitors(app); + if (monitors != null) { + Optional<Monitor> monitorOpt = monitors.stream() + .filter(m -> instance.equals(m.getInstance())) + .findFirst(); + + if (monitorOpt.isPresent()) { + Integer intervals = monitorOpt.get().getIntervals(); + if (intervals != null && intervals > 0) { + long minSeconds = 259200L; // 3 days + long intervalBasedSeconds = intervals * 100L; + long finalSeconds = Math.max(minSeconds, intervalBasedSeconds); + dbQueryTime = finalSeconds + "s"; + log.debug("[Predict] Training window calculated: {} for interval: {}s", dbQueryTime, intervals); + } + } + } + } catch (Exception e) { + log.warn("[Predict] Failed to calculate dynamic history for instance: {}, using default.", instance, e); + } + + // 3. Validate Metric Type + Optional<Job> jobOptional = appService.getAppDefineOption(app); + if (jobOptional.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Application definition not found: " + app); + } + Job job = jobOptional.get(); + + Optional<Metrics> metricsDefineOpt = job.getMetrics().stream() + .filter(m -> m.getName().equals(metrics)) + .findFirst(); + if (metricsDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metrics group not found: " + metrics); + } + + Optional<Metrics.Field> fieldDefineOpt = metricsDefineOpt.get().getFields().stream() + .filter(f -> f.getField().equals(metric)) + .findFirst(); + if (fieldDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metric field not found: " + metric); + } + + if (fieldDefineOpt.get().getType() != CommonConstants.TYPE_NUMBER) { + return Message.fail(CommonConstants.FAIL_CODE, "Prediction is only supported for numeric metrics."); + } + + // 4. Get Training Data (Using the Long Window) + MetricsHistoryData historyData = metricsDataService.getMetricHistoryData( + instance, app, metrics, metric, dbQueryTime, false); + + if (historyData == null || historyData.getValues() == null || historyData.getValues().isEmpty()) { + return Message.success(Collections.emptyMap()); + } + + Map<String, List<PredictionResult>> resultMap = new HashMap<>(); + + // Capture effectively final variable for lambda + final long viewWindowMillis = userViewMillis; + + // 5. Iterate and Forecast + historyData.getValues().forEach((rowInstance, values) -> { + if (values == null || values.size() < 10) { + return; + } + List<History> validHistory = new ArrayList<>(); + + for (Value v : values) { + try { + if (v.getOrigin() != null && !CommonConstants.NULL_VALUE.equals(v.getOrigin())) { + double val = Double.parseDouble(v.getOrigin()); + validHistory.add(History.builder() + .time(v.getTime()) + .dou(val) + .metricType(CommonConstants.TYPE_NUMBER) + .build()); + } + } catch (NumberFormatException ignored) {} + } + + if (validHistory.size() > 10) { + long step = estimateStep(validHistory); + + // Smart Calculation of Forecast Count + // Rule: Predict 1/5 of the user's current view window + long forecastDuration = viewWindowMillis / 5; + int dynamicCount = (int) (forecastDuration / step); + + // Bounds checking + if (dynamicCount < 5) dynamicCount = 5; // Minimum 5 points + if (dynamicCount > 2000) dynamicCount = 2000; // Safety cap + + log.debug("[Predict] View: {}ms, Forecast: {}ms ({} steps), Step: {}ms", + viewWindowMillis, forecastDuration, dynamicCount, step); + + List<PredictionResult> forecast = analysisService.forecast(validHistory, step, dynamicCount); + + if (!forecast.isEmpty()) { + resultMap.put(rowInstance, forecast); + } + } + }); + + return Message.success(resultMap); + } + + /** + * Simple parser for standard time tokens (1h, 6h, 1d, 1w, 4w, 12w) + */ + private long parseSimpleDuration(String timeToken) { + if (timeToken == null) return 0; + try { + String lower = timeToken.toLowerCase(); + if (lower.endsWith("s")) return Long.parseLong(lower.replace("s", "")) * 1000; + if (lower.endsWith("m")) return Long.parseLong(lower.replace("m", "")) * 60 * 1000; + if (lower.endsWith("h")) return Long.parseLong(lower.replace("h", "")) * 3600 * 1000; + if (lower.endsWith("d")) return Long.parseLong(lower.replace("d", "")) * 24 * 3600 * 1000; + if (lower.endsWith("w")) return Long.parseLong(lower.replace("w", "")) * 7 * 24 * 3600 * 1000; + } catch (NumberFormatException e) { + return 0; + } + return 0; Review Comment: The parseSimpleDuration method does not validate input length before parsing, which could lead to potential integer overflow or DoS attacks. For example, passing an extremely large number string like "99999999999999999999h" could cause NumberFormatException or create extremely large values. Add validation to check the input string length and numeric range before parsing, and consider limiting the maximum allowed duration value. ```suggestion // Define maximum allowed duration: 10 years in milliseconds final long MAX_DURATION_MS = 10L * 365 * 24 * 3600 * 1000; try { String lower = timeToken.toLowerCase().trim(); if (lower.length() < 2) return 0; char unit = lower.charAt(lower.length() - 1); String numberPart = lower.substring(0, lower.length() - 1); // Only allow digits, and limit length to 12 digits (trillion) if (!numberPart.matches("\\d{1,12}")) return 0; long value = Long.parseLong(numberPart); long durationMs = 0; switch (unit) { case 's': durationMs = value * 1000L; break; case 'm': durationMs = value * 60L * 1000L; break; case 'h': durationMs = value * 3600L * 1000L; break; case 'd': durationMs = value * 24L * 3600L * 1000L; break; case 'w': durationMs = value * 7L * 24L * 3600L * 1000L; break; default: return 0; } if (durationMs < 0 || durationMs > MAX_DURATION_MS) { return 0; } return durationMs; } catch (NumberFormatException e) { return 0; } ``` ########## web-app/src/app/routes/monitor/monitor-data-chart/monitor-data-chart.component.ts: ########## @@ -315,6 +322,171 @@ export class MonitorDataChartComponent implements OnInit, OnDestroy { ); } + loadPredictionData(isAuto: boolean = false) { + if (!isAuto) { + this.loading = 'Forecasting...'; Review Comment: The hardcoded loading message 'Forecasting...' should be internationalized using the i18n service. This is inconsistent with other loading messages in the component which use translation keys like 'monitor.detail.chart.data-loading'. Consider adding a new translation key such as 'monitor.detail.chart.forecasting' to the i18n files and using it here. ```suggestion this.loading = this.i18nSvc.fanyi('monitor.detail.chart.forecasting'); ``` ########## hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/controller/AnalysisController.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.analysis.algorithm.PredictionResult; +import org.apache.hertzbeat.analysis.service.AnalysisService; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.dto.Message; +import org.apache.hertzbeat.common.entity.dto.MetricsHistoryData; +import org.apache.hertzbeat.common.entity.dto.Value; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.manager.Monitor; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.apache.hertzbeat.manager.service.AppService; +import org.apache.hertzbeat.manager.service.MonitorService; +import org.apache.hertzbeat.warehouse.service.MetricsDataService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Analysis and Prediction Controller + */ +@Tag(name = "Analysis Prediction API") +@RestController +@RequestMapping(path = "/api/analysis") +@Slf4j +public class AnalysisController { + + @Autowired + private MetricsDataService metricsDataService; + + @Autowired + private AnalysisService analysisService; + + @Autowired + private AppService appService; + + @Autowired + private MonitorService monitorService; + + @GetMapping("/predict/{instance}/{app}/{metrics}/{metric}") + @Operation(summary = "Predict metric data", description = "Forecast future metric data based on history") + public Message<Map<String, List<PredictionResult>>> getMetricPrediction( + @Parameter(description = "Monitor Instance", example = "127.0.0.1") @PathVariable String instance, + @Parameter(description = "App Type", example = "linux") @PathVariable String app, + @Parameter(description = "Metrics Name", example = "cpu") @PathVariable String metrics, + @Parameter(description = "Metric Name", example = "usage") @PathVariable String metric, + @Parameter(description = "History time range", example = "6h") @RequestParam(required = false) String history + ) { + // 1. Context Analysis + // We separate "User View Window" (history) from "Model Training Window" (dbQueryTime). + // User view: what the user sees (e.g., 1h). We should predict ~20% of this length. + // Training window: what the model needs (e.g., 3 days). We force this to be large. + + String userViewTime = history != null ? history : "6h"; + long userViewMillis = parseSimpleDuration(userViewTime); + if (userViewMillis <= 0) { + userViewMillis = 6 * 60 * 60 * 1000L; // default 6h + } + + // 2. Determine Training Window (Strategy: Max(3 days, 100 * interval)) + String dbQueryTime = "6h"; // initial fallback + try { + List<Monitor> monitors = monitorService.getAppMonitors(app); + if (monitors != null) { + Optional<Monitor> monitorOpt = monitors.stream() + .filter(m -> instance.equals(m.getInstance())) + .findFirst(); + + if (monitorOpt.isPresent()) { + Integer intervals = monitorOpt.get().getIntervals(); + if (intervals != null && intervals > 0) { + long minSeconds = 259200L; // 3 days + long intervalBasedSeconds = intervals * 100L; + long finalSeconds = Math.max(minSeconds, intervalBasedSeconds); + dbQueryTime = finalSeconds + "s"; + log.debug("[Predict] Training window calculated: {} for interval: {}s", dbQueryTime, intervals); + } + } + } + } catch (Exception e) { + log.warn("[Predict] Failed to calculate dynamic history for instance: {}, using default.", instance, e); + } + + // 3. Validate Metric Type + Optional<Job> jobOptional = appService.getAppDefineOption(app); + if (jobOptional.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Application definition not found: " + app); + } + Job job = jobOptional.get(); + + Optional<Metrics> metricsDefineOpt = job.getMetrics().stream() + .filter(m -> m.getName().equals(metrics)) + .findFirst(); + if (metricsDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metrics group not found: " + metrics); + } + + Optional<Metrics.Field> fieldDefineOpt = metricsDefineOpt.get().getFields().stream() + .filter(f -> f.getField().equals(metric)) + .findFirst(); + if (fieldDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metric field not found: " + metric); + } + + if (fieldDefineOpt.get().getType() != CommonConstants.TYPE_NUMBER) { + return Message.fail(CommonConstants.FAIL_CODE, "Prediction is only supported for numeric metrics."); + } + + // 4. Get Training Data (Using the Long Window) + MetricsHistoryData historyData = metricsDataService.getMetricHistoryData( + instance, app, metrics, metric, dbQueryTime, false); + + if (historyData == null || historyData.getValues() == null || historyData.getValues().isEmpty()) { + return Message.success(Collections.emptyMap()); + } + + Map<String, List<PredictionResult>> resultMap = new HashMap<>(); + + // Capture effectively final variable for lambda + final long viewWindowMillis = userViewMillis; + + // 5. Iterate and Forecast + historyData.getValues().forEach((rowInstance, values) -> { + if (values == null || values.size() < 10) { + return; + } + List<History> validHistory = new ArrayList<>(); + + for (Value v : values) { + try { + if (v.getOrigin() != null && !CommonConstants.NULL_VALUE.equals(v.getOrigin())) { + double val = Double.parseDouble(v.getOrigin()); + validHistory.add(History.builder() + .time(v.getTime()) + .dou(val) + .metricType(CommonConstants.TYPE_NUMBER) + .build()); + } + } catch (NumberFormatException ignored) {} + } + + if (validHistory.size() > 10) { + long step = estimateStep(validHistory); + + // Smart Calculation of Forecast Count + // Rule: Predict 1/5 of the user's current view window + long forecastDuration = viewWindowMillis / 5; + int dynamicCount = (int) (forecastDuration / step); + + // Bounds checking + if (dynamicCount < 5) dynamicCount = 5; // Minimum 5 points + if (dynamicCount > 2000) dynamicCount = 2000; // Safety cap + + log.debug("[Predict] View: {}ms, Forecast: {}ms ({} steps), Step: {}ms", + viewWindowMillis, forecastDuration, dynamicCount, step); + + List<PredictionResult> forecast = analysisService.forecast(validHistory, step, dynamicCount); + + if (!forecast.isEmpty()) { + resultMap.put(rowInstance, forecast); + } + } + }); + + return Message.success(resultMap); + } + + /** + * Simple parser for standard time tokens (1h, 6h, 1d, 1w, 4w, 12w) + */ + private long parseSimpleDuration(String timeToken) { + if (timeToken == null) return 0; + try { + String lower = timeToken.toLowerCase(); + if (lower.endsWith("s")) return Long.parseLong(lower.replace("s", "")) * 1000; + if (lower.endsWith("m")) return Long.parseLong(lower.replace("m", "")) * 60 * 1000; + if (lower.endsWith("h")) return Long.parseLong(lower.replace("h", "")) * 3600 * 1000; + if (lower.endsWith("d")) return Long.parseLong(lower.replace("d", "")) * 24 * 3600 * 1000; + if (lower.endsWith("w")) return Long.parseLong(lower.replace("w", "")) * 7 * 24 * 3600 * 1000; + } catch (NumberFormatException e) { + return 0; + } + return 0; + } + + /** + * Estimate time step from history data (simple median) Review Comment: The Javadoc comment for estimateStep could be more comprehensive. It should document what "simple median" means in this context (actually returns the median of time differences), the return value unit (milliseconds), default behavior when insufficient data exists, and why only the first 100 points are checked. ```suggestion * Estimates the time step (interval) between consecutive history data points by calculating * the median of the time differences between their timestamps. * <p> * Only the first 100 data points are considered for efficiency and because this is typically * sufficient for a reliable estimation. * <p> * If there are fewer than 2 data points, or if all time differences are non-positive, * a default value of 60000 milliseconds (1 minute) is returned. * * @param data the list of history data points, assumed to be sorted by time ascending * @return the estimated time step in milliseconds (median of positive time differences) ``` ########## web-app/src/app/routes/monitor/monitor-data-chart/monitor-data-chart.component.ts: ########## @@ -315,6 +322,171 @@ export class MonitorDataChartComponent implements OnInit, OnDestroy { ); } + loadPredictionData(isAuto: boolean = false) { + if (!isAuto) { + this.loading = 'Forecasting...'; + } + + // CRITICAL FIX: Pass 'this.timePeriod' so backend knows the view context (1h, 6h, 1d, 1w) + // and can calculate appropriate forecast duration (e.g. 1/5 of view length). + // We pass null for predictTime to rely on backend auto-calculation. + let predictionData$ = this.monitorSvc + .getMonitorMetricsPredictionData(this.instance, this.app, this.metrics, this.metric, this.timePeriod) + .pipe( + finalize(() => { + if (!isAuto) { + this.loading = null; + } + predictionData$.unsubscribe(); + }) + ) + .subscribe( + (message: any) => { + if (message.code === 0 && message.data) { + // Get current series to append forecast data + const currentSeries = (this.eChartOption.series as any[]) || []; + const newSeries = [...currentSeries]; + + let hasData = false; + // Get translations for chart legend + const forecastName = this.i18nSvc.fanyi('monitor.detail.chart.forecast'); + // Separate names for clarity + const confidenceLowerName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.lower'); + const confidenceUpperName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.upper'); + // Fallback if translation keys don't exist + const lowerName = confidenceLowerName.includes('monitor.detail') ? 'Lower Bound' : confidenceLowerName; + const upperName = confidenceUpperName.includes('monitor.detail') ? 'Upper Bound' : confidenceUpperName; + + // Iterate over prediction results + for (const [instance, results] of Object.entries(message.data)) { Review Comment: Unused variable instance. ```suggestion for (const results of Object.values(message.data)) { ``` ########## web-app/src/app/service/monitor.service.ts: ########## @@ -181,6 +182,25 @@ export class MonitorService { return this.http.get<Message<any>>(`${monitor_uri}/${instance}/metric/${metricFull}`, options); } + /** + * Get metric prediction data + */ + public getMonitorMetricsPredictionData( + instance: string, + app: string, + metrics: string, + metric: string, + history: string + ): Observable<Message<any>> { + let httpParams = new HttpParams(); + httpParams = httpParams.append('forecastCount', 10); Review Comment: The parameter 'forecastCount' is hardcoded to 10 but is not used by the backend, which calculates the forecast count dynamically. This parameter name is misleading since its value is ignored. Consider either removing this unused parameter or renaming it to reflect its actual usage (or lack thereof), or making the backend respect this value. ```suggestion // Removed unused 'forecastCount' parameter as it is ignored by the backend. ``` ########## web-app/src/app/routes/monitor/monitor-data-chart/monitor-data-chart.component.ts: ########## @@ -315,6 +322,171 @@ export class MonitorDataChartComponent implements OnInit, OnDestroy { ); } + loadPredictionData(isAuto: boolean = false) { + if (!isAuto) { + this.loading = 'Forecasting...'; + } + + // CRITICAL FIX: Pass 'this.timePeriod' so backend knows the view context (1h, 6h, 1d, 1w) + // and can calculate appropriate forecast duration (e.g. 1/5 of view length). + // We pass null for predictTime to rely on backend auto-calculation. + let predictionData$ = this.monitorSvc + .getMonitorMetricsPredictionData(this.instance, this.app, this.metrics, this.metric, this.timePeriod) + .pipe( + finalize(() => { + if (!isAuto) { + this.loading = null; + } + predictionData$.unsubscribe(); + }) + ) + .subscribe( + (message: any) => { + if (message.code === 0 && message.data) { + // Get current series to append forecast data + const currentSeries = (this.eChartOption.series as any[]) || []; + const newSeries = [...currentSeries]; + + let hasData = false; + // Get translations for chart legend + const forecastName = this.i18nSvc.fanyi('monitor.detail.chart.forecast'); + // Separate names for clarity + const confidenceLowerName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.lower'); + const confidenceUpperName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.upper'); + // Fallback if translation keys don't exist + const lowerName = confidenceLowerName.includes('monitor.detail') ? 'Lower Bound' : confidenceLowerName; + const upperName = confidenceUpperName.includes('monitor.detail') ? 'Upper Bound' : confidenceUpperName; + + // Iterate over prediction results + for (const [instance, results] of Object.entries(message.data)) { + const predictions = results as any[]; + if (!predictions || predictions.length === 0) continue; + hasData = true; + + // Parse prediction data + const forecastLineData = []; + const lowerBoundData = []; + const diffData = []; + const upperBoundData = []; + + for (const p of predictions) { + const val = p.forecast; + const upper = p.upperBound; + const lower = p.lowerBound; + const t = p.time; + + forecastLineData.push([t, val]); + // Basic data for lower bound + lowerBoundData.push([t, lower]); + // Diff data for the stacked band (upper - lower) + diffData.push([t, upper - lower]); + // Actual upper bound data for separate invisible series + upperBoundData.push([t, upper]); + } + + // 1. Lower Bound Series (Base of the stack) + // This series serves two purposes: + // a) It is the bottom edge of the band. + // b) It shows the correct "Lower Bound" value in the tooltip. + newSeries.push({ + name: lowerName, + type: 'line', + data: lowerBoundData, + stack: `confidence-band`, + symbol: 'none', + lineStyle: { opacity: 0 }, + areaStyle: { opacity: 0 }, + // Show tooltip for this series + tooltip: { show: true }, + silent: false // Allow hover to trigger tooltip + }); + + // 2. Band Width Series (Stacked on top of Lower Bound) + // This series draws the filled area. + // It MUST be hidden from tooltip because its value is the "difference", not the absolute value. + newSeries.push({ + name: 'Confidence Band', // Internal name + type: 'line', + data: diffData, + stack: `confidence-band`, + symbol: 'none', + lineStyle: { opacity: 0 }, + areaStyle: { + opacity: 0.3, + color: '#A6C8FF' + }, + // CRITICAL: Hide this series from tooltip so users don't see the diff value + tooltip: { show: false }, + silent: true // Ignore mouse events so it doesn't trigger tooltip + }); + + // 3. Upper Bound Series (Invisible, Non-Stacked) + // This series is purely for the Tooltip. It shows the correct "Upper Bound" value. + newSeries.push({ + name: upperName, + type: 'line', + data: upperBoundData, + // Do NOT stack this series + stack: null, + symbol: 'none', + lineStyle: { opacity: 0 }, // Invisible line + areaStyle: { opacity: 0 }, // No area + // Show tooltip for this series + tooltip: { show: true }, + silent: false + }); + + // 4. Forecast Main Line + newSeries.push({ + name: `${forecastName}`, + type: 'line', + data: forecastLineData, + smooth: true, + lineStyle: { + type: 'dashed', + width: 2, + color: '#ffa318' + }, + itemStyle: { + opacity: 0, + color: '#ffa318' + }, + symbol: 'none', + z: 5 + }); + } + + if (hasData) { + this.eChartOption.series = newSeries; + if (this.echartsInstance) { + this.echartsInstance.setOption({ + series: newSeries + }); + } + if (!isAuto) { + this.notifySvc.success(this.i18nSvc.fanyi('common.notify.success'), 'Forecast data loaded.'); + } + } else { + if (!isAuto) { + this.notifySvc.warning(this.i18nSvc.fanyi('common.notify.warning'), 'Insufficient history data for prediction.'); + } + } + } else { + console.warn(`Prediction failed or no data returned: ${message.msg}`); + if (!isAuto) { + this.notifySvc.error(this.i18nSvc.fanyi('common.notify.error'), message.msg || 'Prediction failed.'); + } + } + }, + error => { + console.error(error); + if (!isAuto) { + this.notifySvc.error(this.i18nSvc.fanyi('common.notify.error'), error.msg || 'Network error during prediction.'); Review Comment: The error message 'Prediction failed.' is hardcoded and should use i18n translation. Similarly, the message at line 484 'Network error during prediction.' should also be internationalized. These messages should be added to the i18n JSON files to support localization. ```suggestion this.notifySvc.error(this.i18nSvc.fanyi('common.notify.error'), message.msg || this.i18nSvc.fanyi('monitor.prediction.failed')); } } }, error => { console.error(error); if (!isAuto) { this.notifySvc.error(this.i18nSvc.fanyi('common.notify.error'), error.msg || this.i18nSvc.fanyi('monitor.prediction.network_error')); ``` ########## web-app/src/app/routes/monitor/monitor-data-chart/monitor-data-chart.component.ts: ########## @@ -315,6 +322,171 @@ export class MonitorDataChartComponent implements OnInit, OnDestroy { ); } + loadPredictionData(isAuto: boolean = false) { + if (!isAuto) { + this.loading = 'Forecasting...'; + } + + // CRITICAL FIX: Pass 'this.timePeriod' so backend knows the view context (1h, 6h, 1d, 1w) + // and can calculate appropriate forecast duration (e.g. 1/5 of view length). + // We pass null for predictTime to rely on backend auto-calculation. + let predictionData$ = this.monitorSvc + .getMonitorMetricsPredictionData(this.instance, this.app, this.metrics, this.metric, this.timePeriod) + .pipe( + finalize(() => { + if (!isAuto) { + this.loading = null; + } + predictionData$.unsubscribe(); + }) + ) + .subscribe( + (message: any) => { + if (message.code === 0 && message.data) { + // Get current series to append forecast data + const currentSeries = (this.eChartOption.series as any[]) || []; + const newSeries = [...currentSeries]; + + let hasData = false; + // Get translations for chart legend + const forecastName = this.i18nSvc.fanyi('monitor.detail.chart.forecast'); + // Separate names for clarity + const confidenceLowerName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.lower'); + const confidenceUpperName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.upper'); + // Fallback if translation keys don't exist + const lowerName = confidenceLowerName.includes('monitor.detail') ? 'Lower Bound' : confidenceLowerName; + const upperName = confidenceUpperName.includes('monitor.detail') ? 'Upper Bound' : confidenceUpperName; Review Comment: The fallback names 'Lower Bound' and 'Upper Bound' should also be internationalized. Consider using a more consistent approach where if translations are missing, default translation keys are used rather than hardcoded English strings. ########## web-app/src/app/routes/monitor/monitor-data-chart/monitor-data-chart.component.ts: ########## @@ -315,6 +322,171 @@ export class MonitorDataChartComponent implements OnInit, OnDestroy { ); } + loadPredictionData(isAuto: boolean = false) { + if (!isAuto) { + this.loading = 'Forecasting...'; + } + + // CRITICAL FIX: Pass 'this.timePeriod' so backend knows the view context (1h, 6h, 1d, 1w) + // and can calculate appropriate forecast duration (e.g. 1/5 of view length). + // We pass null for predictTime to rely on backend auto-calculation. + let predictionData$ = this.monitorSvc + .getMonitorMetricsPredictionData(this.instance, this.app, this.metrics, this.metric, this.timePeriod) + .pipe( + finalize(() => { + if (!isAuto) { + this.loading = null; + } + predictionData$.unsubscribe(); + }) + ) + .subscribe( + (message: any) => { + if (message.code === 0 && message.data) { + // Get current series to append forecast data + const currentSeries = (this.eChartOption.series as any[]) || []; + const newSeries = [...currentSeries]; + + let hasData = false; + // Get translations for chart legend + const forecastName = this.i18nSvc.fanyi('monitor.detail.chart.forecast'); + // Separate names for clarity + const confidenceLowerName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.lower'); + const confidenceUpperName = this.i18nSvc.fanyi('monitor.detail.chart.confidence.upper'); + // Fallback if translation keys don't exist + const lowerName = confidenceLowerName.includes('monitor.detail') ? 'Lower Bound' : confidenceLowerName; + const upperName = confidenceUpperName.includes('monitor.detail') ? 'Upper Bound' : confidenceUpperName; + + // Iterate over prediction results + for (const [instance, results] of Object.entries(message.data)) { + const predictions = results as any[]; + if (!predictions || predictions.length === 0) continue; + hasData = true; + + // Parse prediction data + const forecastLineData = []; + const lowerBoundData = []; + const diffData = []; + const upperBoundData = []; + + for (const p of predictions) { + const val = p.forecast; + const upper = p.upperBound; + const lower = p.lowerBound; + const t = p.time; + + forecastLineData.push([t, val]); + // Basic data for lower bound + lowerBoundData.push([t, lower]); + // Diff data for the stacked band (upper - lower) + diffData.push([t, upper - lower]); + // Actual upper bound data for separate invisible series + upperBoundData.push([t, upper]); + } + + // 1. Lower Bound Series (Base of the stack) + // This series serves two purposes: + // a) It is the bottom edge of the band. + // b) It shows the correct "Lower Bound" value in the tooltip. + newSeries.push({ + name: lowerName, + type: 'line', + data: lowerBoundData, + stack: `confidence-band`, + symbol: 'none', + lineStyle: { opacity: 0 }, + areaStyle: { opacity: 0 }, + // Show tooltip for this series + tooltip: { show: true }, + silent: false // Allow hover to trigger tooltip + }); + + // 2. Band Width Series (Stacked on top of Lower Bound) + // This series draws the filled area. + // It MUST be hidden from tooltip because its value is the "difference", not the absolute value. + newSeries.push({ + name: 'Confidence Band', // Internal name + type: 'line', + data: diffData, + stack: `confidence-band`, + symbol: 'none', + lineStyle: { opacity: 0 }, + areaStyle: { + opacity: 0.3, + color: '#A6C8FF' + }, + // CRITICAL: Hide this series from tooltip so users don't see the diff value + tooltip: { show: false }, + silent: true // Ignore mouse events so it doesn't trigger tooltip + }); + + // 3. Upper Bound Series (Invisible, Non-Stacked) + // This series is purely for the Tooltip. It shows the correct "Upper Bound" value. + newSeries.push({ + name: upperName, + type: 'line', + data: upperBoundData, + // Do NOT stack this series + stack: null, + symbol: 'none', + lineStyle: { opacity: 0 }, // Invisible line + areaStyle: { opacity: 0 }, // No area + // Show tooltip for this series + tooltip: { show: true }, + silent: false + }); + + // 4. Forecast Main Line + newSeries.push({ + name: `${forecastName}`, + type: 'line', + data: forecastLineData, + smooth: true, + lineStyle: { + type: 'dashed', + width: 2, + color: '#ffa318' + }, + itemStyle: { + opacity: 0, + color: '#ffa318' + }, + symbol: 'none', + z: 5 + }); + } + + if (hasData) { + this.eChartOption.series = newSeries; + if (this.echartsInstance) { + this.echartsInstance.setOption({ + series: newSeries + }); + } + if (!isAuto) { + this.notifySvc.success(this.i18nSvc.fanyi('common.notify.success'), 'Forecast data loaded.'); + } + } else { + if (!isAuto) { + this.notifySvc.warning(this.i18nSvc.fanyi('common.notify.warning'), 'Insufficient history data for prediction.'); Review Comment: The notification messages 'Forecast data loaded.' and 'Insufficient history data for prediction.' are hardcoded strings that should be internationalized. These should be added to the i18n JSON files and retrieved using the i18nSvc.fanyi method to maintain consistency with the rest of the application. ```suggestion this.notifySvc.success(this.i18nSvc.fanyi('common.notify.success'), this.i18nSvc.fanyi('monitor.detail.notify.forecastLoaded')); } } else { if (!isAuto) { this.notifySvc.warning(this.i18nSvc.fanyi('common.notify.warning'), this.i18nSvc.fanyi('monitor.detail.notify.insufficientHistory')); ``` ########## hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/controller/AnalysisController.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.analysis.algorithm.PredictionResult; +import org.apache.hertzbeat.analysis.service.AnalysisService; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.dto.Message; +import org.apache.hertzbeat.common.entity.dto.MetricsHistoryData; +import org.apache.hertzbeat.common.entity.dto.Value; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.manager.Monitor; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.apache.hertzbeat.manager.service.AppService; +import org.apache.hertzbeat.manager.service.MonitorService; +import org.apache.hertzbeat.warehouse.service.MetricsDataService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Analysis and Prediction Controller + */ +@Tag(name = "Analysis Prediction API") +@RestController +@RequestMapping(path = "/api/analysis") +@Slf4j +public class AnalysisController { + + @Autowired + private MetricsDataService metricsDataService; + + @Autowired + private AnalysisService analysisService; + + @Autowired + private AppService appService; + + @Autowired + private MonitorService monitorService; + + @GetMapping("/predict/{instance}/{app}/{metrics}/{metric}") + @Operation(summary = "Predict metric data", description = "Forecast future metric data based on history") + public Message<Map<String, List<PredictionResult>>> getMetricPrediction( + @Parameter(description = "Monitor Instance", example = "127.0.0.1") @PathVariable String instance, + @Parameter(description = "App Type", example = "linux") @PathVariable String app, + @Parameter(description = "Metrics Name", example = "cpu") @PathVariable String metrics, + @Parameter(description = "Metric Name", example = "usage") @PathVariable String metric, + @Parameter(description = "History time range", example = "6h") @RequestParam(required = false) String history + ) { + // 1. Context Analysis + // We separate "User View Window" (history) from "Model Training Window" (dbQueryTime). + // User view: what the user sees (e.g., 1h). We should predict ~20% of this length. + // Training window: what the model needs (e.g., 3 days). We force this to be large. + + String userViewTime = history != null ? history : "6h"; + long userViewMillis = parseSimpleDuration(userViewTime); + if (userViewMillis <= 0) { + userViewMillis = 6 * 60 * 60 * 1000L; // default 6h + } + + // 2. Determine Training Window (Strategy: Max(3 days, 100 * interval)) + String dbQueryTime = "6h"; // initial fallback + try { + List<Monitor> monitors = monitorService.getAppMonitors(app); + if (monitors != null) { + Optional<Monitor> monitorOpt = monitors.stream() + .filter(m -> instance.equals(m.getInstance())) + .findFirst(); + + if (monitorOpt.isPresent()) { + Integer intervals = monitorOpt.get().getIntervals(); + if (intervals != null && intervals > 0) { + long minSeconds = 259200L; // 3 days + long intervalBasedSeconds = intervals * 100L; + long finalSeconds = Math.max(minSeconds, intervalBasedSeconds); + dbQueryTime = finalSeconds + "s"; + log.debug("[Predict] Training window calculated: {} for interval: {}s", dbQueryTime, intervals); + } Review Comment: The method calls monitorService.getAppMonitors(app) which likely retrieves all monitors for the application type, then filters in-memory to find the matching instance. This could be inefficient for applications with many monitors. Consider adding a direct lookup method like monitorService.getMonitorByInstance(app, instance) to avoid retrieving and filtering unnecessary data. ```suggestion Optional<Monitor> monitorOpt = monitorService.getMonitorByInstance(app, instance); if (monitorOpt.isPresent()) { Integer intervals = monitorOpt.get().getIntervals(); if (intervals != null && intervals > 0) { long minSeconds = 259200L; // 3 days long intervalBasedSeconds = intervals * 100L; long finalSeconds = Math.max(minSeconds, intervalBasedSeconds); dbQueryTime = finalSeconds + "s"; log.debug("[Predict] Training window calculated: {} for interval: {}s", dbQueryTime, intervals); ``` ########## hertzbeat-analysis/src/main/java/org/apache/hertzbeat/analysis/algorithm/NLinearModel.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.analysis.algorithm; + +import java.util.Arrays; +import org.apache.commons.math3.linear.Array2DRowRealMatrix; +import org.apache.commons.math3.linear.ArrayRealVector; +import org.apache.commons.math3.linear.LUDecomposition; +import org.apache.commons.math3.linear.RealMatrix; +import org.apache.commons.math3.linear.RealVector; +import org.apache.commons.math3.linear.SingularMatrixException; +import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; + +/** + * Industrial-grade Robust NLinear Model. + * Uses Ridge Regression (L2 Regularization) to prevent overfitting and handle singular matrices. + * Note: This class is stateful and not thread-safe. A new instance should be created for each prediction task. + */ +public class NLinearModel { + + private static final int LOOKBACK_WINDOW = 30; + + /** + * Ridge regularization parameter (Lambda). + * A small positive value ensures the matrix is always invertible. + */ + private static final double RIDGE_LAMBDA = 0.01; + + private double[] weights; + private double stdDeviation; + private boolean isFlatLine = false; + private double lastValue = 0.0; + + public void train(double[] historyValues) { + if (historyValues == null || historyValues.length == 0) { + return; + } + + // 1. Critical Fix: Always capture the last value first. + // This ensures that even if we don't have enough data to train the full model, + // we can still fallback to a naive "last-value" prediction instead of returning 0. + this.lastValue = historyValues[historyValues.length - 1]; + + // Check if we have enough data for the sliding window approach + if (historyValues.length < LOOKBACK_WINDOW + 5) { + // Fallback: Calculate simple standard deviation for confidence interval + if (historyValues.length > 1) { + StandardDeviation stdDevCalc = new StandardDeviation(); + this.stdDeviation = stdDevCalc.evaluate(historyValues); + } else { + this.stdDeviation = 0.0; + } + return; + } + + // 2. Pre-check: Flat Line Detection + // If variance is 0 (or very close), logic is simple: prediction = last value + StandardDeviation stdDevCalc = new StandardDeviation(); + double historyStd = stdDevCalc.evaluate(historyValues); + if (historyStd < 0.0001) { + this.isFlatLine = true; + this.stdDeviation = 0.0; + return; + } + this.isFlatLine = false; + + // 3. Prepare Data for Ridge Regression + int n = historyValues.length; + int numSamples = n - LOOKBACK_WINDOW; + int numFeatures = LOOKBACK_WINDOW + 1; // +1 for Intercept + + // Matrix X: [Samples x Features] + // Vector Y: [Samples] + double[][] xData = new double[numSamples][numFeatures]; + double[] yData = new double[numSamples]; + + for (int i = 0; i < numSamples; i++) { + double target = historyValues[i + LOOKBACK_WINDOW]; + double xLast = historyValues[i + LOOKBACK_WINDOW - 1]; // RevIN anchor + + yData[i] = target - xLast; // Normalize Y + + // Intercept term (always 1.0) + xData[i][0] = 1.0; + + // Features (Past L points) + for (int j = 0; j < LOOKBACK_WINDOW; j++) { + xData[i][j + 1] = historyValues[i + j] - xLast; // Normalize X + } + } + + // 4. Solve Ridge Regression: W = (X'X + lambda*I)^-1 * X'Y + try { + RealMatrix X = new Array2DRowRealMatrix(xData); + RealVector Y = new ArrayRealVector(yData); + + RealMatrix XTrans = X.transpose(); + RealMatrix XTransX = XTrans.multiply(X); + + // Add Lambda to Diagonal (Ridge Regularization) + for (int i = 0; i < numFeatures; i++) { + XTransX.addToEntry(i, i, RIDGE_LAMBDA); + } + + // Solve + RealVector XTransY = XTrans.operate(Y); + // LUDecomposition is fast and stable for square matrices + RealVector W = new LUDecomposition(XTransX).getSolver().solve(XTransY); + + this.weights = W.toArray(); + + // 5. Calculate Training Error (Residual StdDev) + double sumSquaredErrors = 0.0; + for (int i = 0; i < numSamples; i++) { + double prediction = 0.0; + for (int j = 0; j < numFeatures; j++) { + prediction += xData[i][j] * weights[j]; + } + double error = yData[i] - prediction; + sumSquaredErrors += error * error; + } + // StdDev of residuals + this.stdDeviation = Math.sqrt(sumSquaredErrors / numSamples); + + } catch (RuntimeException e) { + // Fallback strategy: just predict the last value + this.isFlatLine = true; + this.stdDeviation = historyStd; // Use global std as uncertainty + } + } + + public PredictionResult[] predict(double[] recentHistory, int steps) { + // If untrained or logic fallback + if (isFlatLine || weights == null) { + PredictionResult[] results = new PredictionResult[steps]; + for (int i = 0; i < steps; i++) { + results[i] = PredictionResult.builder() + .forecast(lastValue) + .upperBound(lastValue + 3 * stdDeviation) + .lowerBound(lastValue - 3 * stdDeviation) + .build(); + } + return results; + } + + if (recentHistory.length < LOOKBACK_WINDOW) { + // Should not happen if training succeeded, but as a safeguard + return new PredictionResult[0]; + } + + PredictionResult[] results = new PredictionResult[steps]; + double[] buffer = Arrays.copyOfRange(recentHistory, recentHistory.length - LOOKBACK_WINDOW, recentHistory.length); + + for (int i = 0; i < steps; i++) { + double xLast = buffer[buffer.length - 1]; + + // Apply Weights + // weights[0] is Intercept + double predictionNorm = weights[0]; + + for (int j = 0; j < LOOKBACK_WINDOW; j++) { + double feat = buffer[j] - xLast; // RevIN + predictionNorm += weights[j + 1] * feat; + } + + double prediction = predictionNorm + xLast; + double interval = 3.0 * stdDeviation; + + results[i] = PredictionResult.builder() + .forecast(prediction) + .upperBound(prediction + interval) + .lowerBound(prediction - interval) + .build(); + + // Slide buffer + System.arraycopy(buffer, 1, buffer, 0, LOOKBACK_WINDOW - 1); + buffer[LOOKBACK_WINDOW - 1] = prediction; + } + return results; + } +} Review Comment: The NLinearModel class lacks test coverage. This is a core algorithm with complex logic including ridge regression, flat line detection, and fallback strategies. Unit tests should be added to verify correct behavior for various scenarios: normal data, flat lines, insufficient data, and edge cases with the lookback window. ########## hertzbeat-analysis/src/main/java/org/apache/hertzbeat/analysis/algorithm/NLinearModel.java: ########## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.analysis.algorithm; + +import java.util.Arrays; +import org.apache.commons.math3.linear.Array2DRowRealMatrix; +import org.apache.commons.math3.linear.ArrayRealVector; +import org.apache.commons.math3.linear.LUDecomposition; +import org.apache.commons.math3.linear.RealMatrix; +import org.apache.commons.math3.linear.RealVector; +import org.apache.commons.math3.linear.SingularMatrixException; +import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation; + +/** + * Industrial-grade Robust NLinear Model. + * Uses Ridge Regression (L2 Regularization) to prevent overfitting and handle singular matrices. + * Note: This class is stateful and not thread-safe. A new instance should be created for each prediction task. + */ +public class NLinearModel { + + private static final int LOOKBACK_WINDOW = 30; Review Comment: The LOOKBACK_WINDOW constant is hardcoded to 30. Consider making this configurable through a constructor parameter or application properties to allow tuning based on different use cases (e.g., high-frequency metrics might need different window sizes than low-frequency ones). This would improve the model's flexibility for different monitoring scenarios. ########## hertzbeat-analysis/src/main/java/org/apache/hertzbeat/analysis/algorithm/TimeSeriesPreprocessor.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.analysis.algorithm; + +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.springframework.stereotype.Component; + +/** + * Time series data preprocessing tool: Resampling & Interpolation + */ +@Component +public class TimeSeriesPreprocessor { + + /** + * Preprocess raw metric data into a fixed-step, gap-filled double array + * @param rawData Raw history data list + * @param stepMillis Time step in milliseconds (e.g., 60000 for 1 minute) + * @param startTime Start timestamp + * @param endTime End timestamp + * @return Cleaned data array where index 0 corresponds to startTime + */ + public double[] preprocess(List<History> rawData, long stepMillis, long startTime, long endTime) { + if (rawData == null || rawData.isEmpty()) { + return new double[0]; + } + + // 1. Bucket Aggregation (Snap to Grid) + TreeMap<Long, List<Double>> buckets = new TreeMap<>(); + for (History point : rawData) { + long timestamp = point.getTime(); + if (timestamp < startTime || timestamp > endTime) { + continue; + } + // Align timestamp to the grid + long bucketTime = ((timestamp - startTime) / stepMillis) * stepMillis + startTime; + buckets.computeIfAbsent(bucketTime, k -> new ArrayList<>()).add(point.getDou()); + } + + // 2. Generate Grid and Impute Missing Values + int expectedSize = (int) ((endTime - startTime) / stepMillis) + 1; + double[] result = new double[expectedSize]; + + Double lastValidValue = null; + int lastValidIndex = -1; + + for (int i = 0; i < expectedSize; i++) { + long currentTime = startTime + (i * stepMillis); + + if (buckets.containsKey(currentTime)) { + // If bucket has data, take average + double avg = buckets.get(currentTime).stream() + .mapToDouble(Double::doubleValue).average().orElse(0.0); + result[i] = avg; + + // Perform linear interpolation for gaps + if (lastValidValue != null && (i - lastValidIndex) > 1) { + fillGap(result, lastValidIndex, i, lastValidValue, avg); + } + + lastValidValue = avg; + lastValidIndex = i; + } else { + // Mark as NaN for now + result[i] = Double.NaN; + } + } + + // 3. Handle Edge Cases (Forward/Backward Fill) + fillEdges(result); + + return result; + } + + /** + * Linear interpolation + */ + private void fillGap(double[] data, int startIndex, int endIndex, double startVal, double endVal) { + int steps = endIndex - startIndex; + double slope = (endVal - startVal) / steps; + + for (int i = 1; i < steps; i++) { + data[startIndex + i] = startVal + (slope * i); + } + } + + /** + * Fill leading/trailing NaNs Review Comment: The comment "Fill leading/trailing NaNs" is minimal. Consider expanding this to explain the forward-fill and backward-fill strategy being used, which would help future maintainers understand the edge case handling logic. ```suggestion * Fills leading and trailing NaN values in the data array using a forward-fill and backward-fill strategy. * <p> * - Forward-fill: Any leading NaNs (at the start of the array) are replaced with the first non-NaN value found. * - Backward-fill: Any remaining NaNs (typically trailing, at the end of the array) are replaced with the last valid value. * This ensures that the resulting array contains no NaN values at the edges, which is important for downstream analysis. ``` ########## hertzbeat-analysis/src/main/java/org/apache/hertzbeat/analysis/algorithm/TimeSeriesPreprocessor.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.analysis.algorithm; + +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.springframework.stereotype.Component; + +/** + * Time series data preprocessing tool: Resampling & Interpolation + */ +@Component +public class TimeSeriesPreprocessor { + + /** + * Preprocess raw metric data into a fixed-step, gap-filled double array + * @param rawData Raw history data list + * @param stepMillis Time step in milliseconds (e.g., 60000 for 1 minute) + * @param startTime Start timestamp + * @param endTime End timestamp + * @return Cleaned data array where index 0 corresponds to startTime + */ + public double[] preprocess(List<History> rawData, long stepMillis, long startTime, long endTime) { + if (rawData == null || rawData.isEmpty()) { + return new double[0]; + } + + // 1. Bucket Aggregation (Snap to Grid) + TreeMap<Long, List<Double>> buckets = new TreeMap<>(); + for (History point : rawData) { + long timestamp = point.getTime(); + if (timestamp < startTime || timestamp > endTime) { + continue; + } + // Align timestamp to the grid + long bucketTime = ((timestamp - startTime) / stepMillis) * stepMillis + startTime; + buckets.computeIfAbsent(bucketTime, k -> new ArrayList<>()).add(point.getDou()); + } + + // 2. Generate Grid and Impute Missing Values + int expectedSize = (int) ((endTime - startTime) / stepMillis) + 1; + double[] result = new double[expectedSize]; + + Double lastValidValue = null; + int lastValidIndex = -1; + + for (int i = 0; i < expectedSize; i++) { + long currentTime = startTime + (i * stepMillis); + + if (buckets.containsKey(currentTime)) { + // If bucket has data, take average + double avg = buckets.get(currentTime).stream() + .mapToDouble(Double::doubleValue).average().orElse(0.0); + result[i] = avg; + + // Perform linear interpolation for gaps + if (lastValidValue != null && (i - lastValidIndex) > 1) { + fillGap(result, lastValidIndex, i, lastValidValue, avg); + } + + lastValidValue = avg; + lastValidIndex = i; + } else { + // Mark as NaN for now + result[i] = Double.NaN; + } + } + + // 3. Handle Edge Cases (Forward/Backward Fill) + fillEdges(result); + + return result; + } + + /** + * Linear interpolation Review Comment: The comment "Linear interpolation" is minimal. Consider expanding this method-level comment to explain the interpolation algorithm more clearly, including the parameters (startIndex, endIndex, startVal, endVal) and what the method does (fills gaps between two known points with linearly interpolated values). ```suggestion * Fills missing values between two known data points in the given array using linear interpolation. * <p> * This method assumes that {@code data[startIndex]} and {@code data[endIndex]} are known (non-missing) values, * and fills the values between them (exclusive) with linearly interpolated values. * <ul> * <li>{@code data} - The array to fill.</li> * <li>{@code startIndex} - The index of the first known value (inclusive).</li> * <li>{@code endIndex} - The index of the next known value (inclusive).</li> * <li>{@code startVal} - The value at {@code startIndex}.</li> * <li>{@code endVal} - The value at {@code endIndex}.</li> * </ul> * The method fills {@code data[startIndex + 1]} up to {@code data[endIndex - 1]} with values computed as: * {@code data[startIndex + i] = startVal + (endVal - startVal) * i / (endIndex - startIndex)}, for i in 1..(endIndex - startIndex - 1). ``` ########## hertzbeat-analysis/src/main/java/org/apache/hertzbeat/analysis/algorithm/TimeSeriesPreprocessor.java: ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.analysis.algorithm; + +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.springframework.stereotype.Component; + +/** + * Time series data preprocessing tool: Resampling & Interpolation + */ +@Component +public class TimeSeriesPreprocessor { + + /** + * Preprocess raw metric data into a fixed-step, gap-filled double array + * @param rawData Raw history data list + * @param stepMillis Time step in milliseconds (e.g., 60000 for 1 minute) + * @param startTime Start timestamp + * @param endTime End timestamp + * @return Cleaned data array where index 0 corresponds to startTime + */ + public double[] preprocess(List<History> rawData, long stepMillis, long startTime, long endTime) { + if (rawData == null || rawData.isEmpty()) { + return new double[0]; + } + + // 1. Bucket Aggregation (Snap to Grid) + TreeMap<Long, List<Double>> buckets = new TreeMap<>(); + for (History point : rawData) { + long timestamp = point.getTime(); + if (timestamp < startTime || timestamp > endTime) { + continue; + } + // Align timestamp to the grid + long bucketTime = ((timestamp - startTime) / stepMillis) * stepMillis + startTime; + buckets.computeIfAbsent(bucketTime, k -> new ArrayList<>()).add(point.getDou()); + } + + // 2. Generate Grid and Impute Missing Values + int expectedSize = (int) ((endTime - startTime) / stepMillis) + 1; + double[] result = new double[expectedSize]; + + Double lastValidValue = null; + int lastValidIndex = -1; + + for (int i = 0; i < expectedSize; i++) { + long currentTime = startTime + (i * stepMillis); + + if (buckets.containsKey(currentTime)) { + // If bucket has data, take average + double avg = buckets.get(currentTime).stream() + .mapToDouble(Double::doubleValue).average().orElse(0.0); + result[i] = avg; + + // Perform linear interpolation for gaps + if (lastValidValue != null && (i - lastValidIndex) > 1) { + fillGap(result, lastValidIndex, i, lastValidValue, avg); + } + + lastValidValue = avg; + lastValidIndex = i; + } else { + // Mark as NaN for now + result[i] = Double.NaN; + } + } + + // 3. Handle Edge Cases (Forward/Backward Fill) + fillEdges(result); + + return result; + } + + /** + * Linear interpolation + */ + private void fillGap(double[] data, int startIndex, int endIndex, double startVal, double endVal) { + int steps = endIndex - startIndex; + double slope = (endVal - startVal) / steps; + + for (int i = 1; i < steps; i++) { + data[startIndex + i] = startVal + (slope * i); + } + } + + /** + * Fill leading/trailing NaNs + */ + private void fillEdges(double[] data) { + if (data.length == 0) { + return; + } + + // Forward Fill (Head) + int firstValid = -1; + for (int i = 0; i < data.length; i++) { + if (!Double.isNaN(data[i])) { + firstValid = i; + break; + } + } + + if (firstValid == -1) { + return; // All NaN + } + for (int i = 0; i < firstValid; i++) { + data[i] = data[firstValid]; + } + + // Backward Fill (Tail) + for (int i = firstValid + 1; i < data.length; i++) { + if (Double.isNaN(data[i])) { + data[i] = data[i - 1]; + } + } + } +} Review Comment: The TimeSeriesPreprocessor class lacks test coverage. This component handles critical data preprocessing including bucket aggregation, interpolation, and edge case handling. Unit tests should be added to verify correct behavior for scenarios like missing data, gaps in time series, edge fill operations, and various bucket sizes. ########## hertzbeat-analysis/src/main/java/org/apache/hertzbeat/analysis/service/impl/AnalysisServiceImpl.java: ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.analysis.service.impl; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.analysis.algorithm.NLinearModel; +import org.apache.hertzbeat.analysis.algorithm.PredictionResult; +import org.apache.hertzbeat.analysis.algorithm.TimeSeriesPreprocessor; +import org.apache.hertzbeat.analysis.service.AnalysisService; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Implementation of Analysis Service + */ +@Slf4j +@Service +public class AnalysisServiceImpl implements AnalysisService { + + @Autowired + private TimeSeriesPreprocessor preprocessor; + + @Override + public List<PredictionResult> forecast(List<History> historyData, long stepMillis, int forecastCount) { + if (historyData == null || historyData.isEmpty()) { + return Collections.emptyList(); + } + + // 1. Determine time range + long minTime = Long.MAX_VALUE; + long maxTime = Long.MIN_VALUE; + for (History h : historyData) { + if (h.getTime() < minTime) { + minTime = h.getTime(); + } + if (h.getTime() > maxTime) { + maxTime = h.getTime(); + } + } + + // Align start/end to step + long startTime = (minTime / stepMillis) * stepMillis; + long endTime = (maxTime / stepMillis) * stepMillis; + + // 2. Preprocess Data + double[] y = preprocessor.preprocess(historyData, stepMillis, startTime, endTime); + + // 3. Train Model (Stateful, so create a new instance for each request) + NLinearModel model = new NLinearModel(); + model.train(y); + + // 4. Forecast + PredictionResult[] predictions = model.predict(y, forecastCount); + + // 5. Convert and add timestamps + List<PredictionResult> forecastResult = new ArrayList<>(forecastCount); + + // Use the actual last timestamp from preprocessing as the base for future time + // Note: endTime calculated above is the timestamp of the last bucket + long lastTimestamp = endTime; + + for (int i = 0; i < predictions.length; i++) { + PredictionResult result = predictions[i]; + + // Critical: Set the absolute timestamp for the frontend + // i=0 is the first future point, so time = lastTimestamp + 1 * step + result.setTime(lastTimestamp + ((i + 1) * stepMillis)); + + forecastResult.add(result); + } + + return forecastResult; + } +} Review Comment: The AnalysisServiceImpl class lacks test coverage. This is a key service that orchestrates data preprocessing and model training. Unit tests should be added to verify the forecast method handles various scenarios correctly, including empty data, edge cases with time alignment, and integration with the preprocessor and model. ########## hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/controller/AnalysisController.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.analysis.algorithm.PredictionResult; +import org.apache.hertzbeat.analysis.service.AnalysisService; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.dto.Message; +import org.apache.hertzbeat.common.entity.dto.MetricsHistoryData; +import org.apache.hertzbeat.common.entity.dto.Value; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.manager.Monitor; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.apache.hertzbeat.manager.service.AppService; +import org.apache.hertzbeat.manager.service.MonitorService; +import org.apache.hertzbeat.warehouse.service.MetricsDataService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Analysis and Prediction Controller + */ +@Tag(name = "Analysis Prediction API") +@RestController +@RequestMapping(path = "/api/analysis") +@Slf4j +public class AnalysisController { + + @Autowired + private MetricsDataService metricsDataService; + + @Autowired + private AnalysisService analysisService; + + @Autowired + private AppService appService; + + @Autowired + private MonitorService monitorService; + + @GetMapping("/predict/{instance}/{app}/{metrics}/{metric}") + @Operation(summary = "Predict metric data", description = "Forecast future metric data based on history") + public Message<Map<String, List<PredictionResult>>> getMetricPrediction( + @Parameter(description = "Monitor Instance", example = "127.0.0.1") @PathVariable String instance, + @Parameter(description = "App Type", example = "linux") @PathVariable String app, + @Parameter(description = "Metrics Name", example = "cpu") @PathVariable String metrics, + @Parameter(description = "Metric Name", example = "usage") @PathVariable String metric, + @Parameter(description = "History time range", example = "6h") @RequestParam(required = false) String history + ) { + // 1. Context Analysis + // We separate "User View Window" (history) from "Model Training Window" (dbQueryTime). + // User view: what the user sees (e.g., 1h). We should predict ~20% of this length. + // Training window: what the model needs (e.g., 3 days). We force this to be large. + + String userViewTime = history != null ? history : "6h"; + long userViewMillis = parseSimpleDuration(userViewTime); + if (userViewMillis <= 0) { + userViewMillis = 6 * 60 * 60 * 1000L; // default 6h + } + + // 2. Determine Training Window (Strategy: Max(3 days, 100 * interval)) + String dbQueryTime = "6h"; // initial fallback + try { + List<Monitor> monitors = monitorService.getAppMonitors(app); + if (monitors != null) { + Optional<Monitor> monitorOpt = monitors.stream() + .filter(m -> instance.equals(m.getInstance())) + .findFirst(); + + if (monitorOpt.isPresent()) { + Integer intervals = monitorOpt.get().getIntervals(); + if (intervals != null && intervals > 0) { + long minSeconds = 259200L; // 3 days + long intervalBasedSeconds = intervals * 100L; + long finalSeconds = Math.max(minSeconds, intervalBasedSeconds); + dbQueryTime = finalSeconds + "s"; + log.debug("[Predict] Training window calculated: {} for interval: {}s", dbQueryTime, intervals); + } + } + } + } catch (Exception e) { + log.warn("[Predict] Failed to calculate dynamic history for instance: {}, using default.", instance, e); + } + + // 3. Validate Metric Type + Optional<Job> jobOptional = appService.getAppDefineOption(app); + if (jobOptional.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Application definition not found: " + app); + } + Job job = jobOptional.get(); + + Optional<Metrics> metricsDefineOpt = job.getMetrics().stream() + .filter(m -> m.getName().equals(metrics)) + .findFirst(); + if (metricsDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metrics group not found: " + metrics); + } + + Optional<Metrics.Field> fieldDefineOpt = metricsDefineOpt.get().getFields().stream() + .filter(f -> f.getField().equals(metric)) + .findFirst(); + if (fieldDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metric field not found: " + metric); + } + + if (fieldDefineOpt.get().getType() != CommonConstants.TYPE_NUMBER) { + return Message.fail(CommonConstants.FAIL_CODE, "Prediction is only supported for numeric metrics."); + } + + // 4. Get Training Data (Using the Long Window) + MetricsHistoryData historyData = metricsDataService.getMetricHistoryData( + instance, app, metrics, metric, dbQueryTime, false); + + if (historyData == null || historyData.getValues() == null || historyData.getValues().isEmpty()) { + return Message.success(Collections.emptyMap()); + } + + Map<String, List<PredictionResult>> resultMap = new HashMap<>(); + + // Capture effectively final variable for lambda + final long viewWindowMillis = userViewMillis; + + // 5. Iterate and Forecast + historyData.getValues().forEach((rowInstance, values) -> { + if (values == null || values.size() < 10) { + return; + } + List<History> validHistory = new ArrayList<>(); + + for (Value v : values) { + try { + if (v.getOrigin() != null && !CommonConstants.NULL_VALUE.equals(v.getOrigin())) { + double val = Double.parseDouble(v.getOrigin()); + validHistory.add(History.builder() + .time(v.getTime()) + .dou(val) + .metricType(CommonConstants.TYPE_NUMBER) + .build()); + } + } catch (NumberFormatException ignored) {} + } + + if (validHistory.size() > 10) { + long step = estimateStep(validHistory); + + // Smart Calculation of Forecast Count + // Rule: Predict 1/5 of the user's current view window + long forecastDuration = viewWindowMillis / 5; + int dynamicCount = (int) (forecastDuration / step); + + // Bounds checking + if (dynamicCount < 5) dynamicCount = 5; // Minimum 5 points + if (dynamicCount > 2000) dynamicCount = 2000; // Safety cap + + log.debug("[Predict] View: {}ms, Forecast: {}ms ({} steps), Step: {}ms", + viewWindowMillis, forecastDuration, dynamicCount, step); + + List<PredictionResult> forecast = analysisService.forecast(validHistory, step, dynamicCount); + + if (!forecast.isEmpty()) { + resultMap.put(rowInstance, forecast); + } + } + }); + + return Message.success(resultMap); + } Review Comment: The AnalysisController lacks test coverage. Given that the repository has comprehensive test coverage for other controllers in the hertzbeat-manager module (e.g., AccountControllerTest, AppControllerTest), a corresponding AnalysisControllerTest should be added to test the prediction endpoint, validation logic, and error handling. ########## hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/controller/AnalysisController.java: ########## @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.controller; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.analysis.algorithm.PredictionResult; +import org.apache.hertzbeat.analysis.service.AnalysisService; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.dto.Message; +import org.apache.hertzbeat.common.entity.dto.MetricsHistoryData; +import org.apache.hertzbeat.common.entity.dto.Value; +import org.apache.hertzbeat.common.entity.job.Job; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.manager.Monitor; +import org.apache.hertzbeat.common.entity.warehouse.History; +import org.apache.hertzbeat.manager.service.AppService; +import org.apache.hertzbeat.manager.service.MonitorService; +import org.apache.hertzbeat.warehouse.service.MetricsDataService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +/** + * Analysis and Prediction Controller + */ +@Tag(name = "Analysis Prediction API") +@RestController +@RequestMapping(path = "/api/analysis") +@Slf4j +public class AnalysisController { + + @Autowired + private MetricsDataService metricsDataService; + + @Autowired + private AnalysisService analysisService; + + @Autowired + private AppService appService; + + @Autowired + private MonitorService monitorService; + + @GetMapping("/predict/{instance}/{app}/{metrics}/{metric}") + @Operation(summary = "Predict metric data", description = "Forecast future metric data based on history") + public Message<Map<String, List<PredictionResult>>> getMetricPrediction( + @Parameter(description = "Monitor Instance", example = "127.0.0.1") @PathVariable String instance, + @Parameter(description = "App Type", example = "linux") @PathVariable String app, + @Parameter(description = "Metrics Name", example = "cpu") @PathVariable String metrics, + @Parameter(description = "Metric Name", example = "usage") @PathVariable String metric, + @Parameter(description = "History time range", example = "6h") @RequestParam(required = false) String history + ) { + // 1. Context Analysis + // We separate "User View Window" (history) from "Model Training Window" (dbQueryTime). + // User view: what the user sees (e.g., 1h). We should predict ~20% of this length. + // Training window: what the model needs (e.g., 3 days). We force this to be large. + + String userViewTime = history != null ? history : "6h"; + long userViewMillis = parseSimpleDuration(userViewTime); + if (userViewMillis <= 0) { + userViewMillis = 6 * 60 * 60 * 1000L; // default 6h + } + + // 2. Determine Training Window (Strategy: Max(3 days, 100 * interval)) + String dbQueryTime = "6h"; // initial fallback + try { + List<Monitor> monitors = monitorService.getAppMonitors(app); + if (monitors != null) { + Optional<Monitor> monitorOpt = monitors.stream() + .filter(m -> instance.equals(m.getInstance())) + .findFirst(); + + if (monitorOpt.isPresent()) { + Integer intervals = monitorOpt.get().getIntervals(); + if (intervals != null && intervals > 0) { + long minSeconds = 259200L; // 3 days + long intervalBasedSeconds = intervals * 100L; + long finalSeconds = Math.max(minSeconds, intervalBasedSeconds); + dbQueryTime = finalSeconds + "s"; + log.debug("[Predict] Training window calculated: {} for interval: {}s", dbQueryTime, intervals); + } + } + } + } catch (Exception e) { + log.warn("[Predict] Failed to calculate dynamic history for instance: {}, using default.", instance, e); + } + + // 3. Validate Metric Type + Optional<Job> jobOptional = appService.getAppDefineOption(app); + if (jobOptional.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Application definition not found: " + app); + } + Job job = jobOptional.get(); + + Optional<Metrics> metricsDefineOpt = job.getMetrics().stream() + .filter(m -> m.getName().equals(metrics)) + .findFirst(); + if (metricsDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metrics group not found: " + metrics); + } + + Optional<Metrics.Field> fieldDefineOpt = metricsDefineOpt.get().getFields().stream() + .filter(f -> f.getField().equals(metric)) + .findFirst(); + if (fieldDefineOpt.isEmpty()) { + return Message.fail(CommonConstants.FAIL_CODE, "Metric field not found: " + metric); + } + + if (fieldDefineOpt.get().getType() != CommonConstants.TYPE_NUMBER) { + return Message.fail(CommonConstants.FAIL_CODE, "Prediction is only supported for numeric metrics."); + } + + // 4. Get Training Data (Using the Long Window) + MetricsHistoryData historyData = metricsDataService.getMetricHistoryData( + instance, app, metrics, metric, dbQueryTime, false); + + if (historyData == null || historyData.getValues() == null || historyData.getValues().isEmpty()) { + return Message.success(Collections.emptyMap()); + } + + Map<String, List<PredictionResult>> resultMap = new HashMap<>(); + + // Capture effectively final variable for lambda + final long viewWindowMillis = userViewMillis; + + // 5. Iterate and Forecast + historyData.getValues().forEach((rowInstance, values) -> { + if (values == null || values.size() < 10) { + return; + } + List<History> validHistory = new ArrayList<>(); + + for (Value v : values) { + try { + if (v.getOrigin() != null && !CommonConstants.NULL_VALUE.equals(v.getOrigin())) { + double val = Double.parseDouble(v.getOrigin()); + validHistory.add(History.builder() + .time(v.getTime()) + .dou(val) + .metricType(CommonConstants.TYPE_NUMBER) + .build()); + } + } catch (NumberFormatException ignored) {} + } + + if (validHistory.size() > 10) { + long step = estimateStep(validHistory); + + // Smart Calculation of Forecast Count + // Rule: Predict 1/5 of the user's current view window + long forecastDuration = viewWindowMillis / 5; + int dynamicCount = (int) (forecastDuration / step); + + // Bounds checking + if (dynamicCount < 5) dynamicCount = 5; // Minimum 5 points + if (dynamicCount > 2000) dynamicCount = 2000; // Safety cap + + log.debug("[Predict] View: {}ms, Forecast: {}ms ({} steps), Step: {}ms", + viewWindowMillis, forecastDuration, dynamicCount, step); + + List<PredictionResult> forecast = analysisService.forecast(validHistory, step, dynamicCount); + + if (!forecast.isEmpty()) { + resultMap.put(rowInstance, forecast); + } + } + }); + + return Message.success(resultMap); + } + + /** + * Simple parser for standard time tokens (1h, 6h, 1d, 1w, 4w, 12w) Review Comment: The Javadoc comment for parseSimpleDuration could be more comprehensive. It should document the return value (milliseconds), what happens with invalid input (returns 0), and provide examples of valid input formats. This would help API consumers understand the expected behavior. ```suggestion * Parses a simple duration string (e.g., "1h", "6h", "1d", "1w", "4w", "12w") into milliseconds. * <p> * Supported time units are: * <ul> * <li><b>s</b> - seconds (e.g., "30s")</li> * <li><b>m</b> - minutes (e.g., "15m")</li> * <li><b>h</b> - hours (e.g., "1h", "6h")</li> * <li><b>d</b> - days (e.g., "1d")</li> * <li><b>w</b> - weeks (e.g., "4w", "12w")</li> * </ul> * <p> * Examples of valid input: * <ul> * <li>"1h" -> 3,600,000</li> * <li>"6h" -> 21,600,000</li> * <li>"1d" -> 86,400,000</li> * <li>"4w" -> 2,419,200,000</li> * </ul> * <p> * If the input is invalid or cannot be parsed, returns 0. * * @param timeToken the duration string to parse * @return the duration in milliseconds, or 0 if input is invalid ``` ########## hertzbeat-analysis/pom.xml: ########## @@ -0,0 +1,59 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.hertzbeat</groupId> + <artifactId>hertzbeat</artifactId> + <version>2.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + + <dependencies> + <dependency> + <groupId>org.apache.hertzbeat</groupId> + <artifactId>hertzbeat-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hertzbeat</groupId> + <artifactId>hertzbeat-warehouse</artifactId> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </dependency> + </dependencies> + + + + <artifactId>hertzbeat-analysis</artifactId> + <name>${project.artifactId}</name> Review Comment: The modelVersion element is conventionally placed immediately after the parent element in Maven POM files. The current placement after the parent but before dependencies is unconventional. Consider moving it to line 26, immediately after the parent closing tag, and placing the artifactId and name elements before the dependencies section for better consistency with Maven conventions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
