This is an automated email from the ASF dual-hosted git repository.
zqr10159 pushed a commit to branch 2.0.0
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/2.0.0 by this push:
new ed0c191325 Enrich signal evidence snapshots with entity context
ed0c191325 is described below
commit ed0c191325e63fbf9c8f3dc16e4b516a1ef2c066
Author: Logic <[email protected]>
AuthorDate: Tue Jun 9 20:02:47 2026 +0800
Enrich signal evidence snapshots with entity context
---
.../service/impl/TelemetryIntakeServiceImpl.java | 100 +++++++++++++--------
.../impl/TelemetryIntakeServiceImplTest.java | 24 +++++
2 files changed, 89 insertions(+), 35 deletions(-)
diff --git
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImpl.java
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImpl.java
index 7b53fa7675..c8f8a4e706 100644
---
a/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImpl.java
+++
b/hertzbeat-observability/src/main/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImpl.java
@@ -95,6 +95,8 @@ public class TelemetryIntakeServiceImpl implements
TelemetryEvidenceGateway {
private static final String OTLP_METRIC_HISTOGRAM_BUCKET_COUNTS =
"otlp.metric.histogram.bucket_counts";
private static final String OTLP_METRIC_HISTOGRAM_EXPLICIT_BOUNDS =
"otlp.metric.histogram.explicit_bounds";
private static final String HERTZBEAT_ENTITY_ID =
OtlpResourceSemanticAttributes.HERTZBEAT_ENTITY_ID;
+ private static final String HERTZBEAT_ENTITY_TYPE =
OtlpResourceSemanticAttributes.HERTZBEAT_ENTITY_TYPE;
+ private static final String HERTZBEAT_ENTITY_NAME =
OtlpResourceSemanticAttributes.HERTZBEAT_ENTITY_NAME;
private static final String HERTZBEAT_WORKSPACE_ID =
OtlpResourceSemanticAttributes.HERTZBEAT_WORKSPACE_ID;
private static final Set<String> WORKSPACE_INFRA_SERVICE_NAMES = Set.of(
"otelcol-contrib",
@@ -620,15 +622,17 @@ public class TelemetryIntakeServiceImpl implements
TelemetryEvidenceGateway {
if (!matchesEntitySignal(signal.canonicalIdentities(),
entityIdentityKeys, entityIdentityValues, entityId)) {
continue;
}
+ Map<String, String> snapshotIdentities =
+
enrichCanonicalIdentitiesWithEntityContext(signal.canonicalIdentities(),
entityContext);
TelemetryIdentitySnapshot identitySnapshot = new
TelemetryIdentitySnapshot(
SOURCE_OTLP,
SIGNAL_LOGS,
- signal.canonicalIdentities(),
- signal.canonicalIdentities().get("service.name"),
- signal.canonicalIdentities().get("service.namespace"),
-
signal.canonicalIdentities().get("deployment.environment.name"),
- signal.canonicalIdentities().get("service.instance.id"),
- signal.canonicalIdentities().get("host.name"),
+ snapshotIdentities,
+ snapshotIdentities.get("service.name"),
+ snapshotIdentities.get("service.namespace"),
+ snapshotIdentities.get("deployment.environment.name"),
+ snapshotIdentities.get("service.instance.id"),
+ snapshotIdentities.get("host.name"),
signal.observedAt()
);
List<String> searchTerms = preferredHint == null ||
CollectionUtils.isEmpty(preferredHint.getSearchTerms())
@@ -739,15 +743,17 @@ public class TelemetryIntakeServiceImpl implements
TelemetryEvidenceGateway {
}
Map<String, String> resource = extractStringMap(log == null ? null
: log.getResource());
Map<String, String> attributes = extractStringMap(log == null ?
null : log.getAttributes());
+ Map<String, String> snapshotIdentities =
+
enrichCanonicalIdentitiesWithEntityContext(canonicalIdentities, entityContext);
TelemetryIdentitySnapshot identitySnapshot = new
TelemetryIdentitySnapshot(
SOURCE_OTLP,
SIGNAL_LOGS,
- canonicalIdentities,
- canonicalIdentities.get("service.name"),
- canonicalIdentities.get("service.namespace"),
- canonicalIdentities.get("deployment.environment.name"),
- canonicalIdentities.get("service.instance.id"),
- canonicalIdentities.get("host.name"),
+ snapshotIdentities,
+ snapshotIdentities.get("service.name"),
+ snapshotIdentities.get("service.namespace"),
+ snapshotIdentities.get("deployment.environment.name"),
+ snapshotIdentities.get("service.instance.id"),
+ snapshotIdentities.get("host.name"),
resolveObservedAt(log)
);
String body = log == null || log.getBody() == null ? null :
String.valueOf(log.getBody());
@@ -1102,25 +1108,17 @@ public class TelemetryIntakeServiceImpl implements
TelemetryEvidenceGateway {
.ifPresent(value -> canonicalIdentities.put(key,
value));
}
}
- long entityId = entityId(entityContext);
- if (entityId > 0) {
- canonicalIdentities.putIfAbsent(HERTZBEAT_ENTITY_ID,
String.valueOf(entityId));
- }
- String workspaceId = entityContext == null ||
entityContext.getEntity() == null
- ? null
- : trimToNull(entityContext.getEntity().getWorkspaceId());
- if (workspaceId != null) {
- canonicalIdentities.putIfAbsent(HERTZBEAT_WORKSPACE_ID,
workspaceId);
- }
+ Map<String, String> enrichedIdentities =
+
enrichCanonicalIdentitiesWithEntityContext(canonicalIdentities, entityContext);
return new TelemetryIdentitySnapshot(
source,
signal,
- canonicalIdentities,
- canonicalIdentities.get("service.name"),
- canonicalIdentities.get("service.namespace"),
- canonicalIdentities.get("deployment.environment.name"),
- canonicalIdentities.get("service.instance.id"),
- canonicalIdentities.get("host.name"),
+ enrichedIdentities,
+ enrichedIdentities.get("service.name"),
+ enrichedIdentities.get("service.namespace"),
+ enrichedIdentities.get("deployment.environment.name"),
+ enrichedIdentities.get("service.instance.id"),
+ enrichedIdentities.get("host.name"),
observedAt
);
}
@@ -1172,10 +1170,40 @@ public class TelemetryIntakeServiceImpl implements
TelemetryEvidenceGateway {
}
}
putCanonicalResourceValue(canonical, values, HERTZBEAT_ENTITY_ID);
+ putCanonicalResourceValue(canonical, values, HERTZBEAT_ENTITY_TYPE);
+ putCanonicalResourceValue(canonical, values, HERTZBEAT_ENTITY_NAME);
putCanonicalResourceValue(canonical, values, HERTZBEAT_WORKSPACE_ID);
return canonical;
}
+ private Map<String, String>
enrichCanonicalIdentitiesWithEntityContext(Map<String, String>
canonicalIdentities,
+
ObservedEntityContext entityContext) {
+ Map<String, String> enriched = new LinkedHashMap<>();
+ if (!CollectionUtils.isEmpty(canonicalIdentities)) {
+ enriched.putAll(canonicalIdentities);
+ }
+ if (entityContext == null || entityContext.getEntity() == null) {
+ return enriched;
+ }
+ long entityId = entityId(entityContext);
+ if (entityId > 0) {
+ enriched.putIfAbsent(HERTZBEAT_ENTITY_ID,
String.valueOf(entityId));
+ }
+ String entityType = trimToNull(entityContext.getEntity().getType());
+ if (entityType != null) {
+ enriched.putIfAbsent(HERTZBEAT_ENTITY_TYPE, entityType);
+ }
+ String entityName =
firstNonBlank(entityContext.getEntity().getDisplayName(),
entityContext.getEntity().getName());
+ if (entityName != null) {
+ enriched.putIfAbsent(HERTZBEAT_ENTITY_NAME, entityName);
+ }
+ String workspaceId =
trimToNull(entityContext.getEntity().getWorkspaceId());
+ if (workspaceId != null) {
+ enriched.putIfAbsent(HERTZBEAT_WORKSPACE_ID, workspaceId);
+ }
+ return enriched;
+ }
+
private void putCanonicalResourceValue(Map<String, String> canonical,
Map<?, ?> values, String key) {
Object value = canonicalResourceValue(values, key);
if (value == null) {
@@ -1210,15 +1238,17 @@ public class TelemetryIntakeServiceImpl implements
TelemetryEvidenceGateway {
if (!matchesEntitySignal(signal.canonicalIdentities(),
entityIdentityKeys, entityIdentityValues, entityId)) {
continue;
}
+ Map<String, String> snapshotIdentities =
+
enrichCanonicalIdentitiesWithEntityContext(signal.canonicalIdentities(),
entityContext);
TelemetryIdentitySnapshot snapshot = new TelemetryIdentitySnapshot(
SOURCE_OTLP,
SIGNAL_METRICS,
- signal.canonicalIdentities(),
- signal.canonicalIdentities().get("service.name"),
- signal.canonicalIdentities().get("service.namespace"),
-
signal.canonicalIdentities().get("deployment.environment.name"),
- signal.canonicalIdentities().get("service.instance.id"),
- signal.canonicalIdentities().get("host.name"),
+ snapshotIdentities,
+ snapshotIdentities.get("service.name"),
+ snapshotIdentities.get("service.namespace"),
+ snapshotIdentities.get("deployment.environment.name"),
+ snapshotIdentities.get("service.instance.id"),
+ snapshotIdentities.get("host.name"),
signal.observedAt()
);
result.add(new MetricEvidence(
@@ -1231,7 +1261,7 @@ public class TelemetryIntakeServiceImpl implements
TelemetryEvidenceGateway {
signal.metricName(),
buildBindingResult(entityContext, SOURCE_OTLP,
SIGNAL_METRICS),
buildMetricCorrelationHint(entityContext, snapshot,
signal.observedAt(), signal.metricName()),
- buildCodeNavigationHint(entityContext,
signal.canonicalIdentities(), filterMetricSignalAttributes(signal.attributes()),
+ buildCodeNavigationHint(entityContext, snapshotIdentities,
filterMetricSignalAttributes(signal.attributes()),
buildFallbackSearchTerms(signal.metricName(),
snapshot.getServiceName()), signal.metricName()),
signal.metricName(),
defaultText(signal.metricName(), "OTLP Metric"),
diff --git
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImplTest.java
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImplTest.java
index 0ba7c3bec9..331fee5a02 100644
---
a/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImplTest.java
+++
b/hertzbeat-observability/src/test/java/org/apache/hertzbeat/observability/shared/service/impl/TelemetryIntakeServiceImplTest.java
@@ -435,6 +435,12 @@ class TelemetryIntakeServiceImplTest {
assertEquals("checkout failed in repository",
evidence.getFirst().getBody());
assertEquals("trace-1", evidence.getFirst().getTraceId());
assertEquals("checkout",
evidence.getFirst().getIdentitySnapshot().getServiceName());
+ assertEquals("88", evidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_id"));
+ assertEquals("service", evidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_type"));
+ assertEquals("checkout", evidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_name"));
assertTrue(evidence.getFirst().getBindingResult().isBound());
assertEquals(3,
evidence.getFirst().getBindingResult().getMatchedIdentityCount());
verify(logQueryRepository).queryLogs(1000L, 2000L, "trace-1",
"span-1", 20);
@@ -475,6 +481,14 @@ class TelemetryIntakeServiceImplTest {
assertEquals("checkout",
evidence.getFirst().getIdentitySnapshot().getServiceName());
assertEquals("commerce",
evidence.getFirst().getIdentitySnapshot().getServiceNamespace());
assertEquals("prod",
evidence.getFirst().getIdentitySnapshot().getEnvironmentName());
+ assertEquals("88", evidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_id"));
+ assertEquals("service", evidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_type"));
+ assertEquals("checkout", evidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_name"));
+ assertEquals("team-a", evidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.workspace_id"));
assertTrue(evidence.getFirst().getBindingResult().isBound());
assertEquals(3,
evidence.getFirst().getBindingResult().getMatchedIdentityCount());
}
@@ -540,6 +554,10 @@ class TelemetryIntakeServiceImplTest {
assertEquals(1, logEvidence.size());
assertEquals(88L, logEvidence.getFirst().getEntityId());
assertEquals("checkout failed", logEvidence.getFirst().getBody());
+ assertEquals("88", logEvidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_id"));
+ assertEquals("service", logEvidence.getFirst().getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_type"));
assertEquals("team-a",
logEvidence.getFirst().getResource().get("hertzbeat_workspace_id"));
assertEquals(1, traceSummary.getRecentTraceCount());
assertEquals(1, traceSummary.getRecentErrorTraceCount());
@@ -741,6 +759,12 @@ class TelemetryIntakeServiceImplTest {
assertNotNull(metricEvidence.getAttributes());
assertEquals("partial",
metricEvidence.getAttributes().get("otlp.metric.compatibility"));
assertTrue(metricEvidence.getAttributes().get("otlp.metric.summary.quantiles").contains("\"quantile\":0.95"));
+ assertEquals("102", metricEvidence.getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_id"));
+ assertEquals("service", metricEvidence.getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_type"));
+ assertEquals("checkout", metricEvidence.getIdentitySnapshot()
+ .getCanonicalIdentities().get("hertzbeat.entity_name"));
assertNotNull(metricEvidence.getOtelContext());
assertTrue(metricEvidence.getOtelContext().contains("partial
support"));
assertTrue(metricEvidence.getOtelContext().contains("Summary
quantiles"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]