This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch bugfix/als in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 52ef154947a394cf6628789f1994598491eb46ea Author: kezhenxu94 <[email protected]> AuthorDate: Fri Sep 17 16:01:10 2021 +0800 Fix wrong service name when IP is node IP in `k8s-mesh` --- CHANGES.md | 1 + .../receiver/envoy/als/k8s/K8SServiceRegistry.java | 8 ++ .../envoy/als/k8s/KubernetesNodeRegistry.java | 129 +++++++++++++++++++++ 3 files changed, 138 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 6d1e8b7..d9c7e13 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -67,6 +67,7 @@ Release Notes. * Fix `H2EventQueryDAO` doesn't sort data by Event.START_TIME and uses a wrong pagination query. * Fix `LogHandler` of `kafka-fetcher-plugin` cannot recognize namespace. * Improve the speed of writing TiDB by batching the SQL execution. +* Fix wrong service name when IP is node IP in `k8s-mesh`. #### UI diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java index abd3f35..72a7954 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SServiceRegistry.java @@ -66,6 +66,8 @@ public class K8SServiceRegistry { private final EnvoyMetricReceiverConfig config; + private final KubernetesNodeRegistry nodeRegistry; + public K8SServiceRegistry(final EnvoyMetricReceiverConfig config) { this.config = config; @@ -80,6 +82,7 @@ public class K8SServiceRegistry { .setDaemon(true) .build() ); + nodeRegistry = new KubernetesNodeRegistry(); } public void start() throws IOException { @@ -99,6 +102,8 @@ public class K8SServiceRegistry { listenPodEvents(coreV1Api, factory); factory.startAllRegisteredInformers(); + + nodeRegistry.start(); } private void listenServiceEvents(final CoreV1Api coreV1Api, final SharedInformerFactory factory) { @@ -268,6 +273,9 @@ public class K8SServiceRegistry { } public ServiceMetaInfo findService(final String ip) { + if (nodeRegistry.isNode(ip)) { + return config.serviceMetaInfoFactory().unknown(); + } final ServiceMetaInfo service = ipServiceMetaInfoMap.get(ip); if (isNull(service)) { log.debug("Unknown ip {}, ip -> service is null", ip); diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java new file mode 100644 index 0000000..e5dfcc4 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/KubernetesNodeRegistry.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.als.k8s; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.kubernetes.client.informer.ResourceEventHandler; +import io.kubernetes.client.informer.SharedInformerFactory; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.openapi.Configuration; +import io.kubernetes.client.openapi.apis.CoreV1Api; +import io.kubernetes.client.openapi.models.V1Node; +import io.kubernetes.client.openapi.models.V1NodeAddress; +import io.kubernetes.client.openapi.models.V1NodeList; +import io.kubernetes.client.openapi.models.V1NodeStatus; +import io.kubernetes.client.util.Config; +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.util.StringUtil; + +@Slf4j +final class KubernetesNodeRegistry implements ResourceEventHandler<V1Node> { + private final Set<String> nodeIps; + + private final ExecutorService executor; + + public KubernetesNodeRegistry() { + nodeIps = Collections.newSetFromMap(new ConcurrentHashMap<>()); + executor = Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("KubernetesNodeRegistry-%d") + .setDaemon(true) + .build() + ); + } + + public void start() throws IOException { + final ApiClient apiClient = Config.defaultClient(); + apiClient.setHttpClient(apiClient.getHttpClient() + .newBuilder() + .readTimeout(0, TimeUnit.SECONDS) + .build()); + Configuration.setDefaultApiClient(apiClient); + + final CoreV1Api coreV1Api = new CoreV1Api(); + final SharedInformerFactory factory = new SharedInformerFactory(executor); + + listenNodeEvents(coreV1Api, factory); + + factory.startAllRegisteredInformers(); + } + + private void listenNodeEvents(final CoreV1Api coreV1Api, + final SharedInformerFactory factory) { + factory.sharedIndexInformerFor( + params -> coreV1Api.listNodeCall( + null, + null, + null, + null, + null, + null, + params.resourceVersion, + null, + params.timeoutSeconds, + params.watch, + null + ), + V1Node.class, + V1NodeList.class + ).addEventHandler(this); + } + + @Override + public void onAdd(final V1Node node) { + forEachAddress(node, nodeIps::add); + } + + @Override + public void onUpdate(final V1Node oldNode, final V1Node newNode) { + onAdd(newNode); + } + + @Override + public void onDelete(final V1Node node, + final boolean deletedFinalStateUnknown) { + forEachAddress(node, nodeIps::remove); + } + + void forEachAddress(final V1Node node, + final Consumer<String> consume) { + Optional.ofNullable(node) + .map(V1Node::getStatus) + .map(V1NodeStatus::getAddresses) + .ifPresent(addresses -> + addresses.stream() + .map(V1NodeAddress::getAddress) + .filter(StringUtil::isNotBlank) + .forEach(consume) + ); + } + + boolean isNode(final String ip) { + return nodeIps.contains(ip); + } +}
