This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch k8s/service-registry
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 62017449515dc6b1318a098485b56e9e34d46f84
Author: kezhenxu94 <kezhenx...@163.com>
AuthorDate: Fri Oct 23 12:06:38 2020 +0800

    Improve K8S ALS analysis
    
    The current implementation of envoy ALS K8S analysis is based on the 
hierarchy, pod -> StatefulSet -> deployment, StatefulSet, or others. It's 
freaky and different from the Istio Kubernetes registry.
    
    The new path is pod -> endpoint -> service, and we should leverage Informer 
API instead of raw Kubernetes API.
---
 .github/workflows/e2e.istio.yaml                   |  12 +-
 docker/oap/log4j2.xml                              |   2 +-
 .../receiver/envoy/als/K8SServiceRegistry.java     | 211 ++++++++++++++++
 .../envoy/als/K8sALSServiceMeshHTTPAnalysis.java   | 273 ++++++---------------
 .../server/receiver/envoy/als/ServiceMetaInfo.java |   9 +
 .../receiver/envoy/als/K8sHTTPAnalysisTest.java    |  17 +-
 test/e2e-mesh/e2e-istio/scripts/istio.sh           |   3 +-
 7 files changed, 312 insertions(+), 215 deletions(-)

diff --git a/.github/workflows/e2e.istio.yaml b/.github/workflows/e2e.istio.yaml
index 28db870..67d0b85 100644
--- a/.github/workflows/e2e.istio.yaml
+++ b/.github/workflows/e2e.istio.yaml
@@ -21,14 +21,16 @@ on:
   push:
     branches:
       - master
+      - k8s/service-registry
 
 env:
   SKIP_TEST: true
   ES_VERSION: es7
   ISTIO_VERSION: 1.7.1
+  K8S_VER: 1.17.0
   TAG: ${{ github.sha }}
   SCRIPTS_DIR: test/e2e-mesh/e2e-istio/scripts
-  SW_OAP_BASE_IMAGE: openjdk:8-jre-alpine
+  SW_OAP_BASE_IMAGE: openjdk:11-jdk
 
 jobs:
   als:
@@ -62,17 +64,21 @@ jobs:
         run: |
           git clone https://github.com/apache/skywalking-kubernetes.git
           cd skywalking-kubernetes
-          git reset --hard 419cd1aed8bb4ad972208e5a031527a25d2ae690
+          git reset --hard 80a18d1d475c82ccaace87f2dbe1c0bf22f2dedf
           cd chart
           helm dep up skywalking
           helm -n istio-system install skywalking skywalking \
                --set fullnameOverride=skywalking \
                --set elasticsearch.replicas=1 \
+               --set elasticsearch.minimumMasterNodes=1 \
                --set oap.env.SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS=k8s-mesh \
                --set oap.envoy.als.enabled=true \
                --set oap.replicas=1 \
+               --set ui.image.repository=skywalking/ui \
+               --set ui.image.tag=$TAG \
                --set oap.image.tag=$TAG \
-               --set oap.image.repository=skywalking/oap
+               --set oap.image.repository=skywalking/oap \
+               --set oap.storageType=elasticsearch
           kubectl -n istio-system get pods
 
           sleep 3
diff --git a/docker/oap/log4j2.xml b/docker/oap/log4j2.xml
index eb69a89..a119e1e 100644
--- a/docker/oap/log4j2.xml
+++ b/docker/oap/log4j2.xml
@@ -29,7 +29,7 @@
         <logger name="org.elasticsearch.common.network.IfConfig" level="INFO"/>
         <logger name="io.grpc.netty" level="INFO"/>
         <logger 
name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
-        <Root level="INFO">
+        <Root level="DEBUG">
             <AppenderRef ref="Console"/>
         </Root>
     </Loggers>
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8SServiceRegistry.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8SServiceRegistry.java
new file mode 100644
index 0000000..97c233c
--- /dev/null
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8SServiceRegistry.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.envoy.als;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.kubernetes.client.informer.ResourceEventHandler;
+import io.kubernetes.client.informer.SharedInformerFactory;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Endpoints;
+import io.kubernetes.client.openapi.models.V1EndpointsList;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.util.Config;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+import static java.util.Objects.isNull;
+import static java.util.Objects.requireNonNull;
+
+@Slf4j
+class K8SServiceRegistry {
+    final Map<String, ServiceMetaInfo> ipServiceMap;
+
+    final ExecutorService executor;
+
+    K8SServiceRegistry() {
+        ipServiceMap = new ConcurrentHashMap<>();
+        executor = Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setNameFormat("K8SServiceRegistry-%d")
+                .setDaemon(true)
+                .build()
+        );
+    }
+
+    void start() throws IOException {
+        final ApiClient apiClient = Config.defaultClient();
+        apiClient.setHttpClient(apiClient.getHttpClient()
+                                         .newBuilder()
+                                         .readTimeout(0, TimeUnit.SECONDS)
+                                         .build());
+
+        final CoreV1Api coreV1Api = new CoreV1Api(apiClient);
+        final SharedInformerFactory factory = new 
SharedInformerFactory(executor);
+
+        listenEndpointsEvents(coreV1Api, factory);
+        listenPodEvents(coreV1Api, factory);
+
+        factory.startAllRegisteredInformers();
+    }
+
+    private void listenEndpointsEvents(final CoreV1Api coreV1Api, final 
SharedInformerFactory factory) {
+        factory.sharedIndexInformerFor(
+            params -> coreV1Api.listEndpointsForAllNamespacesCall(
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                params.resourceVersion,
+                params.timeoutSeconds,
+                params.watch,
+                null
+            ),
+            V1Endpoints.class,
+            V1EndpointsList.class
+        ).addEventHandler(new ResourceEventHandler<V1Endpoints>() {
+            @Override
+            public void onAdd(final V1Endpoints endpoints) {
+                addEndpoints(endpoints);
+            }
+
+            @Override
+            public void onUpdate(final V1Endpoints oldEndpoints, final 
V1Endpoints newEndpoints) {
+                removeEndpoints(oldEndpoints);
+                addEndpoints(newEndpoints);
+            }
+
+            @Override
+            public void onDelete(final V1Endpoints endpoints, final boolean 
deletedFinalStateUnknown) {
+                removeEndpoints(endpoints);
+            }
+        });
+    }
+
+    private void listenPodEvents(final CoreV1Api coreV1Api, final 
SharedInformerFactory factory) {
+        factory.sharedIndexInformerFor(
+            params -> coreV1Api.listPodForAllNamespacesCall(
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                params.resourceVersion,
+                params.timeoutSeconds,
+                params.watch,
+                null
+            ),
+            V1Pod.class,
+            V1PodList.class
+        ).addEventHandler(new ResourceEventHandler<V1Pod>() {
+            @Override
+            public void onAdd(final V1Pod pod) {
+                addPod(pod);
+            }
+
+            @Override
+            public void onUpdate(final V1Pod oldPod, final V1Pod newPod) {
+                removePod(oldPod);
+                addPod(newPod);
+            }
+
+            @Override
+            public void onDelete(final V1Pod pod, final boolean 
deletedFinalStateUnknown) {
+                removePod(pod);
+            }
+        });
+    }
+
+    private void removePod(final V1Pod pod) {
+        Optional.ofNullable(pod.getStatus()).ifPresent(
+            status -> ipServiceMap.remove(status.getPodIP())
+        );
+    }
+
+    private void addPod(final V1Pod pod) {
+        Optional.ofNullable(pod.getStatus()).ifPresent(
+            status -> {
+                final String ip = status.getPodIP();
+                final ServiceMetaInfo service = 
ipServiceMap.computeIfAbsent(ip, unused -> new ServiceMetaInfo());
+
+                final V1ObjectMeta podMeta = requireNonNull(pod.getMetadata());
+                service.setServiceInstanceName(String.format("%s.%s", 
podMeta.getName(), podMeta.getNamespace()));
+                service.setTags(transformLabelsToTags(podMeta.getLabels()));
+            }
+        );
+    }
+
+    private void addEndpoints(final V1Endpoints endpoints) {
+        final String serviceName = 
requireNonNull(endpoints.getMetadata()).getName();
+
+        requireNonNull(endpoints.getSubsets()).forEach(subset -> {
+            requireNonNull(subset.getAddresses()).forEach(address -> {
+                final String ip = address.getIp();
+                final ServiceMetaInfo service = 
ipServiceMap.computeIfAbsent(ip, unused -> new ServiceMetaInfo());
+                service.setServiceName(serviceName);
+            });
+        });
+    }
+
+    private void removeEndpoints(final V1Endpoints endpoints) {
+        requireNonNull(endpoints.getSubsets()).forEach(subset -> {
+            requireNonNull(subset.getAddresses()).forEach(address -> {
+                final String ip = address.getIp();
+                ipServiceMap.remove(ip);
+            });
+        });
+    }
+
+    private List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final 
Map<String, String> labels) {
+        if (isNull(labels)) {
+            return Collections.emptyList();
+        }
+        return labels.entrySet()
+                     .stream()
+                     .map(each -> new ServiceMetaInfo.KeyValue(each.getKey(), 
each.getValue()))
+                     .collect(Collectors.toList());
+    }
+
+    ServiceMetaInfo findService(final String ip) {
+        final ServiceMetaInfo service = ipServiceMap.getOrDefault(ip, 
ServiceMetaInfo.UNKNOWN);
+        if (!service.isComplete()) {
+            log.debug("Unknown ip {}, ip -> service is null", ip);
+            return ServiceMetaInfo.UNKNOWN;
+        }
+        return service;
+    }
+
+    boolean isEmpty() {
+        return ipServiceMap.isEmpty();
+    }
+}
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
index 6d1e3dc..807ba06 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sALSServiceMeshHTTPAnalysis.java
@@ -19,7 +19,6 @@
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
 import com.google.common.base.Strings;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Duration;
 import com.google.protobuf.Timestamp;
 import io.envoyproxy.envoy.api.v2.core.Address;
@@ -31,58 +30,32 @@ import 
io.envoyproxy.envoy.data.accesslog.v2.HTTPRequestProperties;
 import io.envoyproxy.envoy.data.accesslog.v2.HTTPResponseProperties;
 import io.envoyproxy.envoy.data.accesslog.v2.TLSProperties;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.apis.ExtensionsV1beta1Api;
-import io.kubernetes.client.openapi.models.V1ObjectMeta;
-import io.kubernetes.client.openapi.models.V1OwnerReference;
-import io.kubernetes.client.openapi.models.V1Pod;
-import io.kubernetes.client.openapi.models.V1PodList;
-import io.kubernetes.client.util.Config;
 import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import lombok.AccessLevel;
-import lombok.Getter;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
 import org.apache.skywalking.apm.network.common.v3.DetectPoint;
 import org.apache.skywalking.apm.network.servicemesh.v3.Protocol;
 import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
 import org.apache.skywalking.oap.server.core.source.Source;
 import 
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Analysis log based on ingress and mesh scenarios.
  */
+@Slf4j
 public class K8sALSServiceMeshHTTPAnalysis implements ALSHTTPAnalysis {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(K8sALSServiceMeshHTTPAnalysis.class);
-
-    private static final String VALID_PHASE = "Running";
-
     private static final String NON_TLS = "NONE";
 
     private static final String M_TLS = "mTLS";
 
     private static final String TLS = "TLS";
 
-    @Getter(AccessLevel.PROTECTED)
-    private final AtomicReference<Map<String, ServiceMetaInfo>> ipServiceMap = 
new AtomicReference<>();
-
-    private final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(
-        1, new ThreadFactoryBuilder()
-            .setNameFormat("load-pod-%d")
-            .setDaemon(true)
-            .build());
+    protected K8SServiceRegistry serviceRegistry;
 
     @Override
     public String name() {
@@ -90,83 +63,15 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
     }
 
     @Override
+    @SneakyThrows
     public void init(EnvoyMetricReceiverConfig config) {
-        executorService.scheduleAtFixedRate(this::loadPodInfo, 0, 15, 
TimeUnit.SECONDS);
-    }
-
-    private boolean invalidPodList() {
-        Map<String, ServiceMetaInfo> map = ipServiceMap.get();
-        return map == null || map.isEmpty();
-    }
-
-    private void loadPodInfo() {
-        try {
-            ApiClient client = Config.defaultClient();
-            CoreV1Api api = new CoreV1Api(client);
-
-            V1PodList list = api.listPodForAllNamespaces(null, null, null, 
null, null, null, null, null, null);
-            Map<String, ServiceMetaInfo> ipMap = new 
HashMap<>(list.getItems().size());
-            long startTime = System.nanoTime();
-            for (V1Pod item : list.getItems()) {
-                if (!item.getStatus().getPhase().equals(VALID_PHASE)) {
-                    LOGGER.debug("Invalid pod {} is not in a valid phase {}", 
item.getMetadata()
-                                                                               
   .getName(), item.getStatus()
-                                                                               
                   .getPhase());
-                    continue;
-                }
-                if 
(item.getStatus().getPodIP().equals(item.getStatus().getHostIP())) {
-                    LOGGER.debug(
-                        "Pod {}.{} is removed because hostIP and podIP are 
identical ", item.getMetadata()
-                                                                               
             .getName(),
-                        item.getMetadata()
-                            .getNamespace()
-                    );
-                    continue;
-                }
-                ipMap.put(item.getStatus().getPodIP(), 
createServiceMetaInfo(item.getMetadata()));
-            }
-            LOGGER.info("Load {} pods in {}ms", ipMap.size(), 
(System.nanoTime() - startTime) / 1_000_000);
-            ipServiceMap.set(ipMap);
-        } catch (Throwable th) {
-            LOGGER.error("run load pod error", th);
-        }
-    }
-
-    private ServiceMetaInfo createServiceMetaInfo(final V1ObjectMeta podMeta) {
-        ExtensionsV1beta1Api extensionsApi = new ExtensionsV1beta1Api();
-        DependencyResource dr = new DependencyResource(podMeta);
-        DependencyResource meta = dr.getOwnerResource(
-            "ReplicaSet", ownerReference -> 
extensionsApi.readNamespacedReplicaSet(
-                ownerReference
-                    .getName(), podMeta.getNamespace(), "", true, true)
-                                                         .getMetadata());
-        ServiceMetaInfo result = new ServiceMetaInfo();
-        if (meta.getMetadata().getOwnerReferences() != null && 
meta.getMetadata().getOwnerReferences().size() > 0) {
-            V1OwnerReference owner = 
meta.getMetadata().getOwnerReferences().get(0);
-            result.setServiceName(String.format("%s.%s", owner.getName(), 
meta.getMetadata().getNamespace()));
-        } else {
-            result.setServiceName(String.format("%s.%s", 
meta.getMetadata().getName(), meta.getMetadata()
-                                                                               
            .getNamespace()));
-        }
-        result.setServiceInstanceName(String.format("%s.%s", 
podMeta.getName(), podMeta.getNamespace()));
-        result.setTags(transformLabelsToTags(podMeta.getLabels()));
-        return result;
-    }
-
-    private List<ServiceMetaInfo.KeyValue> transformLabelsToTags(final 
Map<String, String> labels) {
-        if (labels == null || labels.size() < 1) {
-            return Collections.emptyList();
-        }
-        List<ServiceMetaInfo.KeyValue> result = new ArrayList<>(labels.size());
-        for (Map.Entry<String, String> each : labels.entrySet()) {
-            result.add(new ServiceMetaInfo.KeyValue(each.getKey(), 
each.getValue()));
-        }
-        return result;
+        serviceRegistry = new K8SServiceRegistry();
+        serviceRegistry.start();
     }
 
     @Override
     public List<Source> analysis(StreamAccessLogsMessage.Identifier 
identifier, HTTPAccessLogEntry entry, Role role) {
-        if (invalidPodList()) {
+        if (serviceRegistry.isEmpty()) {
             return Collections.emptyList();
         }
         switch (role) {
@@ -209,95 +114,74 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
                 boolean status = responseCode >= 200 && responseCode < 400;
 
                 Address downstreamRemoteAddress = 
properties.getDownstreamRemoteAddress();
-                ServiceMetaInfo downstreamService = find(
-                    downstreamRemoteAddress.getSocketAddress()
-                                           .getAddress(), 
downstreamRemoteAddress.getSocketAddress()
-                                                                               
  .getPortValue());
+                ServiceMetaInfo downstreamService = 
find(downstreamRemoteAddress.getSocketAddress().getAddress());
                 Address downstreamLocalAddress = 
properties.getDownstreamLocalAddress();
-                ServiceMetaInfo localService = find(
-                    downstreamLocalAddress.getSocketAddress()
-                                          .getAddress(), 
downstreamLocalAddress.getSocketAddress()
-                                                                               
.getPortValue());
+                ServiceMetaInfo localService = 
find(downstreamLocalAddress.getSocketAddress().getAddress());
                 String tlsMode = parseTLS(properties.getTlsProperties());
+
+                ServiceMeshMetric.Builder metric = null;
                 if (cluster.startsWith("inbound|")) {
                     // Server side
                     if (downstreamService.equals(ServiceMetaInfo.UNKNOWN)) {
                         // Ingress -> sidecar(server side)
                         // Mesh telemetry without source, the relation would 
be generated.
-                        ServiceMeshMetric.Builder metric = 
ServiceMeshMetric.newBuilder()
-                                                                            
.setStartTime(startTime)
-                                                                            
.setEndTime(startTime + duration)
-                                                                            
.setDestServiceName(
-                                                                               
 localService.getServiceName())
-                                                                            
.setDestServiceInstance(
-                                                                               
 localService.getServiceInstanceName())
-                                                                            
.setEndpoint(endpoint)
-                                                                            
.setLatency((int) duration)
-                                                                            
.setResponseCode(
-                                                                               
 Math.toIntExact(responseCode))
-                                                                            
.setStatus(status)
-                                                                            
.setProtocol(protocol)
-                                                                            
.setTlsMode(tlsMode)
-                                                                            
.setDetectPoint(DetectPoint.server);
-
-                        LOGGER.debug("Transformed ingress->sidecar inbound 
mesh metric {}", metric);
-                        forward(metric);
+                        metric = ServiceMeshMetric.newBuilder()
+                                                  .setStartTime(startTime)
+                                                  .setEndTime(startTime + 
duration)
+                                                  
.setDestServiceName(localService.getServiceName())
+                                                  
.setDestServiceInstance(localService.getServiceInstanceName())
+                                                  .setEndpoint(endpoint)
+                                                  .setLatency((int) duration)
+                                                  
.setResponseCode(Math.toIntExact(responseCode))
+                                                  .setStatus(status)
+                                                  .setProtocol(protocol)
+                                                  .setTlsMode(tlsMode)
+                                                  
.setDetectPoint(DetectPoint.server);
+
+                        log.debug("Transformed ingress->sidecar inbound mesh 
metric {}", metric);
                     } else {
                         // sidecar -> sidecar(server side)
-                        ServiceMeshMetric.Builder metric = 
ServiceMeshMetric.newBuilder()
-                                                                            
.setStartTime(startTime)
-                                                                            
.setEndTime(startTime + duration)
-                                                                            
.setSourceServiceName(
-                                                                               
 downstreamService.getServiceName())
-                                                                            
.setSourceServiceInstance(
-                                                                               
 downstreamService.getServiceInstanceName())
-                                                                            
.setDestServiceName(
-                                                                               
 localService.getServiceName())
-                                                                            
.setDestServiceInstance(
-                                                                               
 localService.getServiceInstanceName())
-                                                                            
.setEndpoint(endpoint)
-                                                                            
.setLatency((int) duration)
-                                                                            
.setResponseCode(
-                                                                               
 Math.toIntExact(responseCode))
-                                                                            
.setStatus(status)
-                                                                            
.setProtocol(protocol)
-                                                                            
.setTlsMode(tlsMode)
-                                                                            
.setDetectPoint(DetectPoint.server);
-
-                        LOGGER.debug("Transformed sidecar->sidecar(server 
side) inbound mesh metric {}", metric);
-                        forward(metric);
+                        metric = ServiceMeshMetric.newBuilder()
+                                                  .setStartTime(startTime)
+                                                  .setEndTime(startTime + 
duration)
+                                                  
.setSourceServiceName(downstreamService.getServiceName())
+                                                  
.setSourceServiceInstance(downstreamService.getServiceInstanceName())
+                                                  
.setDestServiceName(localService.getServiceName())
+                                                  
.setDestServiceInstance(localService.getServiceInstanceName())
+                                                  .setEndpoint(endpoint)
+                                                  .setLatency((int) duration)
+                                                  
.setResponseCode(Math.toIntExact(responseCode))
+                                                  .setStatus(status)
+                                                  .setProtocol(protocol)
+                                                  .setTlsMode(tlsMode)
+                                                  
.setDetectPoint(DetectPoint.server);
+
+                        log.debug("Transformed sidecar->sidecar(server side) 
inbound mesh metric {}", metric);
                     }
                 } else if (cluster.startsWith("outbound|")) {
                     // sidecar(client side) -> sidecar
                     Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
-                    ServiceMetaInfo destService = find(
-                        upstreamRemoteAddress.getSocketAddress()
-                                             .getAddress(), 
upstreamRemoteAddress.getSocketAddress()
-                                                                               
  .getPortValue());
-
-                    ServiceMeshMetric.Builder metric = 
ServiceMeshMetric.newBuilder()
-                                                                        
.setStartTime(startTime)
-                                                                        
.setEndTime(startTime + duration)
-                                                                        
.setSourceServiceName(
-                                                                            
downstreamService.getServiceName())
-                                                                        
.setSourceServiceInstance(
-                                                                            
downstreamService.getServiceInstanceName())
-                                                                        
.setDestServiceName(
-                                                                            
destService.getServiceName())
-                                                                        
.setDestServiceInstance(
-                                                                            
destService.getServiceInstanceName())
-                                                                        
.setEndpoint(endpoint)
-                                                                        
.setLatency((int) duration)
-                                                                        
.setResponseCode(Math.toIntExact(responseCode))
-                                                                        
.setStatus(status)
-                                                                        
.setProtocol(protocol)
-                                                                        
.setTlsMode(tlsMode)
-                                                                        
.setDetectPoint(DetectPoint.client);
-
-                    LOGGER.debug("Transformed sidecar->sidecar(server side) 
inbound mesh metric {}", metric);
-                    forward(metric);
-
+                    ServiceMetaInfo destService = 
find(upstreamRemoteAddress.getSocketAddress().getAddress());
+
+                    metric = ServiceMeshMetric.newBuilder()
+                                              .setStartTime(startTime)
+                                              .setEndTime(startTime + duration)
+                                              
.setSourceServiceName(downstreamService.getServiceName())
+                                              
.setSourceServiceInstance(downstreamService.getServiceInstanceName())
+                                              
.setDestServiceName(destService.getServiceName())
+                                              
.setDestServiceInstance(destService.getServiceInstanceName())
+                                              .setEndpoint(endpoint)
+                                              .setLatency((int) duration)
+                                              
.setResponseCode(Math.toIntExact(responseCode))
+                                              .setStatus(status)
+                                              .setProtocol(protocol)
+                                              .setTlsMode(tlsMode)
+                                              
.setDetectPoint(DetectPoint.client);
+
+                    log.debug("Transformed sidecar->sidecar(server side) 
inbound mesh metric {}", metric);
                 }
+
+                Optional.ofNullable(metric).ifPresent(this::forward);
             }
         }
         return sources;
@@ -308,11 +192,11 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
             return NON_TLS;
         }
         if 
(Strings.isNullOrEmpty(Optional.ofNullable(properties.getLocalCertificateProperties())
-            
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
+                                          
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
             return NON_TLS;
         }
         if 
(Strings.isNullOrEmpty(Optional.ofNullable(properties.getPeerCertificateProperties())
-            
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
+                                          
.orElse(TLSProperties.CertificateProperties.newBuilder().build()).getSubject()))
 {
             return TLS;
         }
         return M_TLS;
@@ -326,14 +210,10 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
             Address upstreamRemoteAddress = 
properties.getUpstreamRemoteAddress();
             if (downstreamLocalAddress != null && downstreamRemoteAddress != 
null && upstreamRemoteAddress != null) {
                 SocketAddress downstreamRemoteAddressSocketAddress = 
downstreamRemoteAddress.getSocketAddress();
-                ServiceMetaInfo outside = find(
-                    downstreamRemoteAddressSocketAddress.getAddress(), 
downstreamRemoteAddressSocketAddress
-                        .getPortValue());
+                ServiceMetaInfo outside = 
find(downstreamRemoteAddressSocketAddress.getAddress());
 
                 SocketAddress downstreamLocalAddressSocketAddress = 
downstreamLocalAddress.getSocketAddress();
-                ServiceMetaInfo ingress = find(
-                    downstreamLocalAddressSocketAddress.getAddress(), 
downstreamLocalAddressSocketAddress
-                        .getPortValue());
+                ServiceMetaInfo ingress = 
find(downstreamLocalAddressSocketAddress.getAddress());
 
                 long startTime = formatAsLong(properties.getStartTime());
                 long duration = 
formatAsLong(properties.getTimeToLastDownstreamTxByte());
@@ -375,13 +255,11 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
                                                                     
.setTlsMode(tlsMode)
                                                                     
.setDetectPoint(DetectPoint.server);
 
-                LOGGER.debug("Transformed ingress inbound mesh metric {}", 
metric);
+                log.debug("Transformed ingress inbound mesh metric {}", 
metric);
                 forward(metric);
 
                 SocketAddress upstreamRemoteAddressSocketAddress = 
upstreamRemoteAddress.getSocketAddress();
-                ServiceMetaInfo targetService = find(
-                    upstreamRemoteAddressSocketAddress.getAddress(), 
upstreamRemoteAddressSocketAddress
-                        .getPortValue());
+                ServiceMetaInfo targetService = 
find(upstreamRemoteAddressSocketAddress.getAddress());
 
                 long outboundStartTime = startTime + 
formatAsLong(properties.getTimeToFirstUpstreamTxByte());
                 long outboundEndTime = startTime + 
formatAsLong(properties.getTimeToLastUpstreamRxByte());
@@ -409,7 +287,7 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
                                                                             
.setTlsMode(NON_TLS)
                                                                             
.setDetectPoint(DetectPoint.client);
 
-                LOGGER.debug("Transformed ingress outbound mesh metric {}", 
outboundMetric);
+                log.debug("Transformed ingress outbound mesh metric {}", 
outboundMetric);
                 forward(outboundMetric);
             }
         }
@@ -435,17 +313,8 @@ public class K8sALSServiceMeshHTTPAnalysis implements 
ALSHTTPAnalysis {
     /**
      * @return found service info, or {@link ServiceMetaInfo#UNKNOWN} to 
represent not found.
      */
-    protected ServiceMetaInfo find(String ip, int port) {
-        Map<String, ServiceMetaInfo> map = ipServiceMap.get();
-        if (map == null) {
-            LOGGER.debug("Unknown ip {}, ip -> service is null", ip);
-            return ServiceMetaInfo.UNKNOWN;
-        }
-        if (map.containsKey(ip)) {
-            return map.get(ip);
-        }
-        LOGGER.debug("Unknown ip {}, ip -> service is {}", ip, map);
-        return ServiceMetaInfo.UNKNOWN;
+    protected ServiceMetaInfo find(String ip) {
+        return serviceRegistry.findService(ip);
     }
 
     protected void forward(ServiceMeshMetric.Builder metric) {
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
index 41ba0e5..c449c2f 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/ServiceMetaInfo.java
@@ -25,6 +25,8 @@ import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 
+import static java.util.Objects.nonNull;
+
 @Getter
 @Setter
 @ToString
@@ -41,6 +43,13 @@ public class ServiceMetaInfo {
         this.serviceInstanceName = serviceInstanceName;
     }
 
+    /**
+     * @return {@code true} if this object is completely constructed, 
otherwise {@code false}.
+     */
+    public boolean isComplete() {
+        return nonNull(serviceName) && nonNull(serviceInstanceName);
+    }
+
     @Setter
     @Getter
     @RequiredArgsConstructor
diff --git 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
index e6c797f..dea5f70 100644
--- 
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
+++ 
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/K8sHTTPAnalysisTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.skywalking.oap.server.receiver.envoy.als;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.protobuf.util.JsonFormat;
 import io.envoyproxy.envoy.service.accesslog.v2.StreamAccessLogsMessage;
 import java.io.IOException;
@@ -34,6 +33,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class K8sHTTPAnalysisTest {
 
     private MockK8sAnalysis analysis;
@@ -147,12 +150,12 @@ public class K8sHTTPAnalysisTest {
 
         @Override
         public void init(EnvoyMetricReceiverConfig config) {
-            getIpServiceMap().set(
-                ImmutableMap.of("10.44.2.56", new ServiceMetaInfo("ingress", 
"ingress-Inst"), "10.44.2.54",
-                                new ServiceMetaInfo("productpage", 
"productpage-Inst"), "10.44.6.66",
-                                new ServiceMetaInfo("detail", "detail-Inst"), 
"10.44.2.55",
-                                new ServiceMetaInfo("review", "detail-Inst")
-                ));
+            serviceRegistry = mock(K8SServiceRegistry.class);
+            
when(serviceRegistry.findService(anyString())).thenReturn(ServiceMetaInfo.UNKNOWN);
+            when(serviceRegistry.findService("10.44.2.56")).thenReturn(new 
ServiceMetaInfo("ingress", "ingress-Inst"));
+            when(serviceRegistry.findService("10.44.2.54")).thenReturn(new 
ServiceMetaInfo("productpage", "productpage-Inst"));
+            when(serviceRegistry.findService("10.44.6.66")).thenReturn(new 
ServiceMetaInfo("detail", "detail-Inst"));
+            when(serviceRegistry.findService("10.44.2.55")).thenReturn(new 
ServiceMetaInfo("review", "detail-Inst"));
         }
 
         @Override
diff --git a/test/e2e-mesh/e2e-istio/scripts/istio.sh 
b/test/e2e-mesh/e2e-istio/scripts/istio.sh
index ffd432c..2578818 100644
--- a/test/e2e-mesh/e2e-istio/scripts/istio.sh
+++ b/test/e2e-mesh/e2e-istio/scripts/istio.sh
@@ -21,7 +21,6 @@
 
 set -ex
 
-curl -L https://istio.io/downloadIstio | sh -
-sudo mv $PWD/istio-$ISTIO_VERSION/bin/istioctl /usr/local/bin/
+istioctl version || (curl -L https://istio.io/downloadIstio | sh - && sudo mv 
$PWD/istio-$ISTIO_VERSION/bin/istioctl /usr/local/bin/)
 istioctl install $@
 kubectl label namespace default istio-injection=enabled

Reply via email to