This is an automated email from the ASF dual-hosted git repository.
zqr10159 pushed a commit to branch 2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/2.0.0 by this push:
new b0f33f8bd4 Probe related metric candidates with PromQL
b0f33f8bd4 is described below
commit b0f33f8bd44ee698b65fcf2d10159bab5e114087
Author: Logic <[email protected]>
AuthorDate: Wed Jun 10 00:08:21 2026 +0800
Probe related metric candidates with PromQL
---
.../impl/OtlpIngestionWorkspaceServiceImpl.java | 102 ++++++++++++++++++++-
.../OtlpIngestionWorkspaceServiceImplTest.java | 74 +++++++++++++++
2 files changed, 172 insertions(+), 4 deletions(-)
diff --git
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImpl.java
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImpl.java
index 066e94eb2c..1161040571 100644
---
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImpl.java
+++
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImpl.java
@@ -91,6 +91,7 @@ public class OtlpIngestionWorkspaceServiceImpl implements
OtlpIngestionWorkspace
private static final String DEFAULT_METRICS_GROUP_BY = String.join(", ",
METRICS_ENTITY_CONTEXT_GROUP_LABELS);
private static final String DEFAULT_METRICS_AGGREGATION = "sum";
private static final String METRICS_CONSOLE_REF_ID =
"otlp-metrics-console";
+ private static final String RELATED_METRICS_REF_ID =
"otlp-related-metrics";
private static final Pattern METRICS_FILTER_MATCHER = Pattern.compile(
"\\s*([A-Za-z_:][A-Za-z0-9_.:-]*)\\s*(=~|!~|!=|=)\\s*(?:\"((?:\\\\.|[^\"\\\\])*)\"|'((?:\\\\.|[^'\\\\])*)'|([^,\\s]+))\\s*"
);
@@ -800,7 +801,7 @@ public class OtlpIngestionWorkspaceServiceImpl implements
OtlpIngestionWorkspace
String normalizedOperationName = trimToNull(operationName);
List<OtlpRelatedMetricsDto.ResourceMatcher> resourceMatchers =
parseRelatedMetricResourceMatchers(normalizedFilter);
List<OtlpRelatedMetricsDto.Candidate> candidates =
buildRelatedMetricCandidates(
- context, resourceMatchers, normalizedOperationName,
resolvedLimit
+ context, resourceMatchers, normalizedOperationName,
resolvedStart, resolvedEnd, resolvedLimit
);
return new OtlpRelatedMetricsDto(
context,
@@ -823,6 +824,8 @@ public class OtlpIngestionWorkspaceServiceImpl implements
OtlpIngestionWorkspace
OtlpMetricsConsoleDto.Context context,
List<OtlpRelatedMetricsDto.ResourceMatcher> resourceMatchers,
String operationName,
+ long start,
+ long end,
int limit) {
LinkedHashMap<String, OtlpRelatedMetricsDto.Candidate> candidates =
new LinkedHashMap<>();
Map<String, String> resourceMatch =
resourceMatcherValueMap(resourceMatchers);
@@ -859,11 +862,102 @@ public class OtlpIngestionWorkspaceServiceImpl
implements OtlpIngestionWorkspace
? operationContextResourceMatch(context,
operationName)
: serviceContextResourceMatch(context)
);
- if (candidates.size() >= limit) {
- return List.copyOf(candidates.values());
+ }
+ List<OtlpRelatedMetricsDto.Candidate> rawCandidates =
candidates.values().stream()
+ .limit(MAX_RELATED_METRICS_LIMIT)
+ .toList();
+ if (metricQueryRepository.hasPromqlExecutor()) {
+ List<OtlpRelatedMetricsDto.Candidate> availableCandidates =
+
filterPromqlAvailableRelatedMetricCandidates(rawCandidates, start, end, limit);
+ if (!availableCandidates.isEmpty()) {
+ return availableCandidates;
+ }
+ }
+ return rawCandidates.stream().limit(limit).toList();
+ }
+
+ private List<OtlpRelatedMetricsDto.Candidate>
filterPromqlAvailableRelatedMetricCandidates(
+ List<OtlpRelatedMetricsDto.Candidate> candidates,
+ long start,
+ long end,
+ int limit) {
+ if (CollectionUtils.isEmpty(candidates) || limit <= 0) {
+ return List.of();
+ }
+ String step = resolvePromqlStep(start, end, null);
+ List<OtlpRelatedMetricsDto.Candidate> available = new ArrayList<>();
+ for (OtlpRelatedMetricsDto.Candidate candidate : candidates) {
+ for (String query :
buildRelatedMetricAvailabilityQueries(candidate)) {
+ MetricQueryRepository.PromqlRangeQueryResult result =
metricQueryRepository.queryPromqlRange(
+ RELATED_METRICS_REF_ID,
+ query,
+ start,
+ end,
+ step
+ );
+ if (result.errorMessage() != null) {
+ log.debug("query related metric candidate failed: {}",
result.errorMessage());
+ continue;
+ }
+ if
(buildMetricsConsoleStats(result.results()).getNonEmptySeries() > 0) {
+ available.add(new OtlpRelatedMetricsDto.Candidate(
+ candidate.getQuery(),
+ candidate.getSource(),
+ candidate.getFamily(),
+ "promql-series",
+ candidate.getMatchedLabels(),
+ candidate.getResourceMatch()
+ ));
+ break;
+ }
+ }
+ if (available.size() >= limit) {
+ break;
+ }
+ }
+ return available;
+ }
+
+ private List<String>
buildRelatedMetricAvailabilityQueries(OtlpRelatedMetricsDto.Candidate
candidate) {
+ if (candidate == null || !StringUtils.hasText(candidate.getQuery())) {
+ return List.of();
+ }
+ String metricName = normalizePromqlMetricName(candidate.getQuery());
+ if (!StringUtils.hasText(metricName)) {
+ return List.of();
+ }
+ Map<String, String> resourceMatch = candidate.getResourceMatch() ==
null
+ ? Map.of()
+ : candidate.getResourceMatch();
+ if ("operation".equals(candidate.getSource())
+ && StringUtils.hasText(resourceMatch.get("operation_name"))
+ && StringUtils.hasText(resourceMatch.get("http_route"))) {
+ LinkedHashMap<String, String> operationNameMatch = new
LinkedHashMap<>(resourceMatch);
+ operationNameMatch.remove("http_route");
+ LinkedHashMap<String, String> httpRouteMatch = new
LinkedHashMap<>(resourceMatch);
+ httpRouteMatch.remove("operation_name");
+ return List.of(
+ buildRelatedMetricAvailabilityQuery(metricName,
operationNameMatch),
+ buildRelatedMetricAvailabilityQuery(metricName,
httpRouteMatch)
+ );
+ }
+ return List.of(buildRelatedMetricAvailabilityQuery(metricName,
resourceMatch));
+ }
+
+ private String buildRelatedMetricAvailabilityQuery(String metricName,
Map<String, String> resourceMatch) {
+ List<String> matchers = new ArrayList<>();
+ matchers.add("__name__=\"" + escapePromqlLabelValue(metricName) +
"\"");
+ if (!CollectionUtils.isEmpty(resourceMatch)) {
+ for (Map.Entry<String, String> entry : resourceMatch.entrySet()) {
+ String label = normalizePromqlLabelName(entry.getKey());
+ String value = trimToNull(entry.getValue());
+ if (!isPromqlLabelName(label) || !StringUtils.hasText(value)) {
+ continue;
+ }
+ matchers.add(label + "=\"" + escapePromqlLabelValue(value) +
"\"");
}
}
- return candidates.values().stream().limit(limit).toList();
+ return "sum by (__name__) ({" + String.join(", ", matchers) + "})";
}
private void addRelatedMetricCandidate(LinkedHashMap<String,
OtlpRelatedMetricsDto.Candidate> candidates,
diff --git
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImplTest.java
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImplTest.java
index cbd0832a79..36816bcd21 100644
---
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImplTest.java
+++
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/ingestion/service/impl/OtlpIngestionWorkspaceServiceImplTest.java
@@ -1167,6 +1167,80 @@ class OtlpIngestionWorkspaceServiceImplTest {
&& "POST
/checkout".equals(candidate.getResourceMatch().get("http_route"))));
}
+ @Test
+ void relatedMetricsPrefersPromqlAvailableCandidatesWhenExecutorExists() {
+ observabilitySignalIntakeGateway.recordOtlpMetricIntake(
+ Map.of(
+ "service.name", "checkout",
+ "service.namespace", "commerce",
+ "deployment.environment.name", "prod"
+ ),
+ 2_000L,
+ "http.server.duration",
+ "histogram",
+ "ms",
+ 14.0,
+ Map.of()
+ );
+
when(workspaceQueryGateway.findEntityById(42L)).thenReturn(java.util.Optional.empty());
+
when(workspaceQueryGateway.findIdentitiesByEntityId(42L)).thenReturn(List.of());
+ when(metricQueryRepository.hasPromqlExecutor()).thenReturn(true);
+ when(metricQueryRepository.queryPromqlRange(
+ eq("otlp-related-metrics"),
+ anyString(),
+ anyLong(),
+ anyLong(),
+ anyString()
+ )).thenAnswer(invocation -> {
+ String query = invocation.getArgument(1);
+ if (query.contains("__name__=\"http_server_duration\"")) {
+ return promqlSuccess(new DatasourceQueryData(
+ "otlp-related-metrics",
+ 200,
+ null,
+ List.of(new DatasourceQueryData.SchemaData(
+ new DatasourceQueryData.MetricSchema(
+ List.of(
+ new
DatasourceQueryData.MetricField("__ts__", "time", null),
+ new
DatasourceQueryData.MetricField("__value__", "number", null)
+ ),
+ Map.of("__name__",
"http_server_duration"),
+ Map.of()
+ ),
+ Collections.singletonList(new Object[]
{2_000L, 14.0})
+ ))
+ ));
+ }
+ return promqlSuccess(new
DatasourceQueryData("otlp-related-metrics", 200, null, List.of()));
+ });
+
+ OtlpRelatedMetricsDto related =
otlpIngestionWorkspaceService.getRelatedMetrics(
+ 42L,
+ "service",
+ 1_000L,
+ 2_000L,
+ "checkout",
+ "commerce",
+ "prod",
+ "k8s.pod.name=\"checkout-7d9\" and host.name=\"node-a\"",
+ null,
+ "8"
+ );
+
+ assertFalse(related.getCandidates().isEmpty());
+ assertEquals("http_server_duration",
related.getCandidates().getFirst().getQuery());
+ assertEquals("promql-series",
related.getCandidates().getFirst().getReason());
+ verify(metricQueryRepository).queryPromqlRange(
+ eq("otlp-related-metrics"),
+ argThat(query ->
query.contains("__name__=\"http_server_duration\"")
+ && query.contains("service_name=\"checkout\"")
+ && query.contains("service_namespace=\"commerce\"")),
+ eq(1_000L),
+ eq(2_000L),
+ anyString()
+ );
+ }
+
@Test
void metricsConsoleAppliesFilterWhenQueryIsExplicitMetricName() {
String expectedQuery =
groupedMetricPromql("__name__=\"http_server_request_duration_count\", "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]