This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 7727ef5b7c Add Unknown Node when receive Kubernetes peer address is
not aware in current cluster (#12496)
7727ef5b7c is described below
commit 7727ef5b7c0902d7a45987102b8a994f641361fc
Author: mrproliu <[email protected]>
AuthorDate: Thu Aug 1 16:21:28 2024 +0800
Add Unknown Node when receive Kubernetes peer address is not aware in
current cluster (#12496)
---
docs/en/changes/changes.md | 1 +
.../provider/handler/AccessLogServiceHandler.java | 34 +++++++++++++++++-----
.../access_log/expected/dependency-services.yml | 2 +-
3 files changed, 29 insertions(+), 8 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index eff84e5d9a..f1ffad1f2d 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -45,6 +45,7 @@
* BanyanDB: if the model column is already a `@BanyanDB.TimestampColumn`, set
`@BanyanDB.NoIndexing` on it to reduce indexes.
* BanyanDB: stream sort-by `time` query, use internal time-series rather than
`index` to improve the query performance.
* Bump up graphql-java to 21.5.
+* Add Unknown Node when receive Kubernetes peer address is not aware in
current cluster.
#### UI
diff --git
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
index 189afa5494..9d6b769b22 100644
---
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
+++
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java
@@ -46,6 +46,7 @@ import
org.apache.skywalking.apm.network.ebpf.accesslog.v3.IPAddress;
import
org.apache.skywalking.apm.network.ebpf.accesslog.v3.KubernetesProcessAddress;
import org.apache.skywalking.library.kubernetes.ObjectID;
import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -80,6 +81,10 @@ import java.util.stream.Stream;
@Slf4j
public class AccessLogServiceHandler extends
EBPFAccessLogServiceGrpc.EBPFAccessLogServiceImplBase {
+ protected static final KubernetesProcessAddress UNKNOWN_ADDRESS =
KubernetesProcessAddress.newBuilder()
+ .setServiceName(Const.UNKNOWN)
+ .setPodName(Const.UNKNOWN)
+ .build();
private final SourceReceiver sourceReceiver;
private final NamingControl namingControl;
@@ -171,7 +176,8 @@ public class AccessLogServiceHandler extends
EBPFAccessLogServiceGrpc.EBPFAccess
private void dispatchKernelLog(NodeInfo node, ConnectionInfo connection,
AccessLogKernelLog kernelLog) {
final List<K8SMetrics> metrics = Arrays.asList(connection.toService(),
connection.toServiceInstance(),
- connection.toServiceRelation(),
connection.toServiceInstanceRelation());
+ connection.toServiceRelation(),
connection.toServiceInstanceRelation())
+ .stream().filter(Objects::nonNull).collect(Collectors.toList());
for (K8SMetrics metric : metrics) {
switch (kernelLog.getOperationCase()) {
@@ -464,20 +470,24 @@ public class AccessLogServiceHandler extends
EBPFAccessLogServiceGrpc.EBPFAccess
protected KubernetesProcessAddress buildKubernetesAddressByIP(NodeInfo
nodeInfo, IPAddress ipAddress) {
final ObjectID pod =
K8sInfoRegistry.getInstance().findPodByIP(ipAddress.getHost());
if (pod == ObjectID.EMPTY) {
- return null;
- }
- if (nodeInfo.shouldExcludeNamespace(pod.namespace())) {
- log.debug("Should exclude the namespace[{}] traffic, pod: {}",
pod.namespace(), pod.name());
- return null;
+ // if cannot found the address, then return the unknown address
+ log.debug("building unknown address by ip: {}:{}",
ipAddress.getHost(), ipAddress.getPort());
+ return buildUnknownAddress();
}
final ObjectID serviceName =
K8sInfoRegistry.getInstance().findService(pod.namespace(), pod.name());
if (serviceName == ObjectID.EMPTY) {
- return null;
+ // if the pod have been found, but the service name cannot found,
then still return unknown address
+ log.debug("building unknown address by pod: {}:{}", pod.name(),
ipAddress.getPort());
+ return buildUnknownAddress();
}
return buildRemoteAddress(nodeInfo, serviceName, pod);
}
+ protected KubernetesProcessAddress buildUnknownAddress() {
+ return UNKNOWN_ADDRESS;
+ }
+
protected KubernetesProcessAddress buildRemoteAddress(NodeInfo nodeInfo,
ObjectID service, ObjectID pod) {
String serviceName = service.name() + "." + service.namespace();
if (StringUtil.isNotEmpty(nodeInfo.getClusterName())) {
@@ -511,6 +521,10 @@ public class AccessLogServiceHandler extends
EBPFAccessLogServiceGrpc.EBPFAccess
this.nodeInfo = nodeInfo;
this.protocolType = connection.getProtocol();
this.valid = generateIsValid();
+ if (log.isDebugEnabled() &&
+ (Objects.equals(this.local, buildUnknownAddress()) ||
Objects.equals(this.remote, buildUnknownAddress()))) {
+ log.debug("found unknown connection: {}", connection);
+ }
}
private KubernetesProcessAddress buildAddress(NodeInfo nodeInfo,
ConnectionAddress address) {
@@ -542,6 +556,9 @@ public class AccessLogServiceHandler extends
EBPFAccessLogServiceGrpc.EBPFAccess
}
public K8SService toService() {
+ if (Objects.equals(local, buildUnknownAddress())) {
+ return null;
+ }
final K8SService service = new K8SService();
service.setName(buildServiceNameByAddress(nodeInfo, local));
service.setLayer(Layer.K8S_SERVICE);
@@ -550,6 +567,9 @@ public class AccessLogServiceHandler extends
EBPFAccessLogServiceGrpc.EBPFAccess
}
public K8SServiceInstance toServiceInstance() {
+ if (Objects.equals(local, buildUnknownAddress())) {
+ return null;
+ }
final K8SServiceInstance serviceInstance = new
K8SServiceInstance();
serviceInstance.setServiceName(buildServiceNameByAddress(nodeInfo,
local));
serviceInstance.setServiceInstanceName(buildServiceInstanceName(local));
diff --git
a/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml
b/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml
index fb4b5554bf..f54beebc4e 100644
---
a/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml
+++
b/test/e2e-v2/cases/profiling/ebpf/access_log/expected/dependency-services.yml
@@ -17,7 +17,7 @@ nodes:
{{- contains .nodes }}
- id: {{ b64enc "productpage.default"}}.1
name: productpage.default
- type: null
+ type: http
isreal: true
- id: {{ b64enc "details.default"}}.1
name: details.default