This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 92c0cb8  Add function `retagByK8sMeta` and opt type 
`K8sRetagType.Pod2Service` in MAL for k8s to relate pods and services. (#6608)
92c0cb8 is described below

commit 92c0cb856d4df5ded4708d285c4e0ecce97dfd7a
Author: wankai123 <[email protected]>
AuthorDate: Thu Mar 25 20:50:58 2021 +0800

    Add function `retagByK8sMeta` and opt type `K8sRetagType.Pod2Service` in 
MAL for k8s to relate pods and services. (#6608)
---
 CHANGES.md                                         |   1 +
 docs/en/concepts-and-designs/mal.md                |  26 +++
 oap-server/analyzer/meter-analyzer/pom.xml         |   6 +-
 .../skywalking/oap/meter/analyzer/Analyzer.java    |   5 +
 .../skywalking/oap/meter/analyzer/dsl/DSL.java     |   5 +
 .../analyzer/dsl/ExpressionParsingContext.java     |   6 +-
 .../oap/meter/analyzer/dsl/SampleFamily.java       |  13 ++
 .../meter/analyzer/dsl/tagOpt/K8sRetagType.java    |  50 +++++
 .../oap/meter/analyzer/dsl/tagOpt/Retag.java       |  25 +++
 .../oap/meter/analyzer/k8s/K8sInfoRegistry.java    | 224 +++++++++++++++++++++
 .../oap/meter/analyzer/dsl/K8sTagTest.java         | 212 +++++++++++++++++++
 11 files changed, 571 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 986942e..09d958f 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -59,6 +59,7 @@ Release Notes.
 * Add functions in MAL to filter metrics according to the metric value.
 * Optimize the self monitoring grafana dashboard.
 * Enhance the export service.
+* Add function `retagByK8sMeta` and opt type `K8sRetagType.Pod2Service` in MAL 
for k8s to relate pods and services.
 
 #### UI
 * Update selector scroller to show in all pages.
diff --git a/docs/en/concepts-and-designs/mal.md 
b/docs/en/concepts-and-designs/mal.md
index 7770b62..4a3d7fb 100644
--- a/docs/en/concepts-and-designs/mal.md
+++ b/docs/en/concepts-and-designs/mal.md
@@ -57,6 +57,31 @@ For example, this filters all instance_trace_count samples 
for values >= 33:
 ```
 instance_trace_count.valueGreaterEqual(33)
 ```
+### Tag manipulator
+MAL provides tag manipulators to change(add/delete/update) tags and their 
values.
+
+#### K8s
+MAL supports using the metadata of k8s to manipulate the tags and their values.
+This feature requires OAP Server to have the authority to access the K8s's 
`API Server`.
+
+##### retagByK8sMeta
+`retagByK8sMeta(newLabelName, K8sRetagType, existingLabelName)`. Add a new tag 
to the sample family based on an existing label's value. Provide several 
internal converting types, including
+- K8sRetagType.Pod2Service  
+
+Add a tag to the sample by using `service` as the key, 
`$serviceName.$namespace` as the value, by the given value of the tag key, 
which represents the name of a pod.
+
+For example:
+```
+container_cpu_usage_seconds_total{container=my-nginx, cpu=total, 
pod=my-nginx-5dc4865748-mbczh} 2
+```
+Expression:
+```
+container_cpu_usage_seconds_total.retagByK8sMeta('service' , 
K8sRetagType.Pod2Service , 'pod')
+```
+Output:
+```
+container_cpu_usage_seconds_total{container=my-nginx, cpu=total, 
pod=my-nginx-5dc4865748-mbczh, service='nginx-service.default'} 2
+```
 
 ### Binary operators
 
@@ -186,6 +211,7 @@ Examples:
 #### time
 `time()`. returns the number of seconds since January 1, 1970 UTC.
 
+
 ## Down Sampling Operation
 MAL should instruct meter-system how to do downsampling for metrics. It 
doesn't only refer to aggregate raw samples to 
 `minute` level, but also hints data from `minute` to higher levels, for 
instance, `hour` and `day`. 
diff --git a/oap-server/analyzer/meter-analyzer/pom.xml 
b/oap-server/analyzer/meter-analyzer/pom.xml
index 5b99036..d4723b7 100644
--- a/oap-server/analyzer/meter-analyzer/pom.xml
+++ b/oap-server/analyzer/meter-analyzer/pom.xml
@@ -33,7 +33,6 @@
             <artifactId>server-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-
         <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy</artifactId>
@@ -42,6 +41,11 @@
             <groupId>io.vavr</groupId>
             <artifactId>vavr</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.kubernetes</groupId>
+            <artifactId>client-java</artifactId>
+            <version>${kubernetes.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
index 7c39d87..4b9975e 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/Analyzer.java
@@ -34,6 +34,7 @@ import 
org.apache.skywalking.oap.meter.analyzer.dsl.ExpressionParsingContext;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Result;
 import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
 import org.apache.skywalking.oap.meter.analyzer.dsl.SampleFamily;
+import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
 import org.apache.skywalking.oap.server.core.analysis.NodeType;
 import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
 import 
org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic;
@@ -219,6 +220,10 @@ public class Analyzer {
             }
         }
         createMetric(ctx.getScopeType(), metricType.literal, 
ctx.getDownsampling());
+
+        if (ctx.isRetagByK8sMeta()) {
+            K8sInfoRegistry.getInstance().start();
+        }
     }
 
     private void createMetric(final ScopeType scopeType,
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
index 45fa7ee..65a93ae 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/DSL.java
@@ -21,7 +21,9 @@ package org.apache.skywalking.oap.meter.analyzer.dsl;
 import groovy.lang.Binding;
 import groovy.lang.GroovyShell;
 import groovy.util.DelegatingScript;
+import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
 import org.codehaus.groovy.control.CompilerConfiguration;
+import org.codehaus.groovy.control.customizers.ImportCustomizer;
 
 /**
  * DSL combines methods to parse groovy based DSL expression.
@@ -37,6 +39,9 @@ public final class DSL {
     public static Expression parse(final String expression) {
         CompilerConfiguration cc = new CompilerConfiguration();
         cc.setScriptBaseClass(DelegatingScript.class.getName());
+        ImportCustomizer icz = new ImportCustomizer();
+        icz.addImport("K8sRetagType", K8sRetagType.class.getName());
+        cc.addCompilationCustomizers(icz);
         GroovyShell sh = new GroovyShell(new Binding(), cc);
         DelegatingScript script = (DelegatingScript) sh.parse(expression);
         return new Expression(expression, script);
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
index 2a421fc..f571cca 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/ExpressionParsingContext.java
@@ -61,7 +61,6 @@ public class ExpressionParsingContext implements Closeable {
     List<String> samples;
 
     boolean isHistogram;
-
     int[] percentiles;
 
     Set<String> aggregationLabels;
@@ -73,6 +72,11 @@ public class ExpressionParsingContext implements Closeable {
     ScopeType scopeType;
 
     /**
+     * Mark whether the retagByK8sMeta func in expressions is active
+     */
+    boolean isRetagByK8sMeta;
+
+    /**
      * Get labels no scope related.
      *
      * @return labels
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
index be53356..b4c82d0 100644
--- 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java
@@ -38,6 +38,7 @@ import 
org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EndpointEn
 import 
org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.EntityDescription;
 import 
org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.InstanceEntityDescription;
 import 
org.apache.skywalking.oap.meter.analyzer.dsl.EntityDescription.ServiceEntityDescription;
+import org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt.K8sRetagType;
 import org.apache.skywalking.oap.server.core.UnexpectedException;
 import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
 import org.apache.skywalking.oap.server.core.analysis.meter.ScopeType;
@@ -310,6 +311,18 @@ public class SampleFamily {
         );
     }
 
+    /* k8s retags*/
+    public SampleFamily retagByK8sMeta(String newLabelName, K8sRetagType type, 
String existingLabelName) {
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(newLabelName));
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(existingLabelName));
+        ExpressionParsingContext.get().ifPresent(ctx -> ctx.isRetagByK8sMeta = 
true);
+        if (this == EMPTY) {
+            return EMPTY;
+        }
+
+        return SampleFamily.build(this.context, type.execute(samples, 
newLabelName, existingLabelName));
+    }
+
     public SampleFamily histogram() {
         return histogram("le", this.context.defaultHistogramBucketUnit);
     }
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/K8sRetagType.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/K8sRetagType.java
new file mode 100644
index 0000000..8647831
--- /dev/null
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/K8sRetagType.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+
+public enum K8sRetagType implements Retag {
+
+    Pod2Service {
+        @Override
+        public Sample[] execute(final Sample[] ss, final String newLabelName, 
final String existingLabelName) {
+            Sample[] samples = Arrays.stream(ss).map(sample -> {
+                String podName = sample.getLabels().get(existingLabelName);
+
+                if (!Strings.isNullOrEmpty(podName)) {
+                    String serviceName = 
K8sInfoRegistry.getInstance().findServiceName(podName);
+                    if (!Strings.isNullOrEmpty(serviceName)) {
+                        Map<String, String> labels = 
Maps.newHashMap(sample.getLabels());
+                        labels.put(newLabelName, serviceName);
+                        return 
sample.toBuilder().labels(ImmutableMap.copyOf(labels)).build();
+                    }
+                }
+                return sample;
+            }).toArray(Sample[]::new);
+            return samples;
+        }
+    }
+}
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/Retag.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/Retag.java
new file mode 100644
index 0000000..eda574e
--- /dev/null
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/tagOpt/Retag.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl.tagOpt;
+
+import org.apache.skywalking.oap.meter.analyzer.dsl.Sample;
+
+public interface Retag {
+    Sample[] execute(Sample[] ss, String newLabelName, String 
existingLabelName);
+}
diff --git 
a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
new file mode 100644
index 0000000..694cd31
--- /dev/null
+++ 
b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/k8s/K8sInfoRegistry.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.k8s;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.kubernetes.client.informer.ResourceEventHandler;
+import io.kubernetes.client.informer.SharedInformerFactory;
+import io.kubernetes.client.openapi.Configuration;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1Endpoints;
+import io.kubernetes.client.openapi.models.V1EndpointsList;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PodList;
+import io.kubernetes.client.util.Config;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.models.V1Pod;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.isNull;
+import static java.util.Optional.ofNullable;
+
+@Slf4j
+public class K8sInfoRegistry {
+
+    private final static K8sInfoRegistry INSTANCE = new K8sInfoRegistry();
+    private final AtomicBoolean isStarted = new AtomicBoolean(false);
+    private final Map<String/* ip */, V1Pod> ipPodMap = new 
ConcurrentHashMap<>();
+    private final Map<String/* ip */, String/* serviceName.namespace */> 
ipServiceMap = new ConcurrentHashMap<>();
+    private final Map<String/* podName */, String /* serviceName.namespace */> 
podServiceMap = new ConcurrentHashMap<>();
+    private ExecutorService executor;
+
+    public static K8sInfoRegistry getInstance() {
+        return INSTANCE;
+    }
+
+    private void init() {
+        executor = Executors.newCachedThreadPool(
+            new ThreadFactoryBuilder()
+                .setNameFormat("K8sInfoRegistry-%d")
+                .setDaemon(true)
+                .build()
+        );
+    }
+
+    @SneakyThrows
+    public void start() {
+        if (isStarted.compareAndSet(false, true)) {
+            init();
+            final ApiClient apiClient = Config.defaultClient();
+            apiClient.setHttpClient(apiClient.getHttpClient()
+                                             .newBuilder()
+                                             .readTimeout(0, TimeUnit.SECONDS)
+                                             .build());
+            Configuration.setDefaultApiClient(apiClient);
+
+            final CoreV1Api coreV1Api = new CoreV1Api();
+            final SharedInformerFactory factory = new 
SharedInformerFactory(executor);
+
+            listenEndpointsEvents(coreV1Api, factory);
+            listenPodEvents(coreV1Api, factory);
+            factory.startAllRegisteredInformers();
+        }
+    }
+
+    private void listenEndpointsEvents(final CoreV1Api coreV1Api, final 
SharedInformerFactory factory) {
+        factory.sharedIndexInformerFor(
+            params -> coreV1Api.listEndpointsForAllNamespacesCall(
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                params.resourceVersion,
+                300,
+                params.watch,
+                null
+            ),
+            V1Endpoints.class,
+            V1EndpointsList.class
+        ).addEventHandler(new ResourceEventHandler<V1Endpoints>() {
+            @Override
+            public void onAdd(final V1Endpoints endpoints) {
+                addEndpoints(endpoints);
+            }
+
+            @Override
+            public void onUpdate(final V1Endpoints oldEndpoints, final 
V1Endpoints newEndpoints) {
+                addEndpoints(newEndpoints);
+            }
+
+            @Override
+            public void onDelete(final V1Endpoints endpoints, final boolean 
deletedFinalStateUnknown) {
+                removeEndpoints(endpoints);
+            }
+        });
+    }
+
+    private void listenPodEvents(final CoreV1Api coreV1Api, final 
SharedInformerFactory factory) {
+        factory.sharedIndexInformerFor(
+            params -> coreV1Api.listPodForAllNamespacesCall(
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                params.resourceVersion,
+                300,
+                params.watch,
+                null
+            ),
+            V1Pod.class,
+            V1PodList.class
+        ).addEventHandler(new ResourceEventHandler<V1Pod>() {
+            @Override
+            public void onAdd(final V1Pod pod) {
+                addPod(pod);
+            }
+
+            @Override
+            public void onUpdate(final V1Pod oldPod, final V1Pod newPod) {
+                addPod(newPod);
+            }
+
+            @Override
+            public void onDelete(final V1Pod pod, final boolean 
deletedFinalStateUnknown) {
+                removePod(pod);
+            }
+        });
+    }
+
+    private void addPod(final V1Pod pod) {
+        ofNullable(pod.getStatus()).ifPresent(
+            status -> ofNullable(status.getPodIP()).ifPresent(
+                ip -> ipPodMap.put(ip, pod))
+        );
+
+        recompose();
+    }
+
+    private void removePod(final V1Pod pod) {
+        ofNullable(pod.getStatus()).ifPresent(
+            status -> ipPodMap.remove(status.getPodIP())
+        );
+        ofNullable(pod.getMetadata()).ifPresent(
+            metadata -> podServiceMap.remove(pod.getMetadata().getName())
+        );
+    }
+
+    private void addEndpoints(final V1Endpoints endpoints) {
+        V1ObjectMeta endpointsMetadata = endpoints.getMetadata();
+        if (isNull(endpointsMetadata)) {
+            log.error("Endpoints metadata is null: {}", endpoints);
+            return;
+        }
+
+        final String namespace = endpointsMetadata.getNamespace();
+        final String name = endpointsMetadata.getName();
+
+        ofNullable(endpoints.getSubsets()).ifPresent(subsets -> 
subsets.forEach(
+            subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> 
addresses.forEach(
+                address -> ipServiceMap.put(address.getIp(), name + "." + 
namespace)
+            ))
+        ));
+
+        recompose();
+    }
+
+    private void removeEndpoints(final V1Endpoints endpoints) {
+        ofNullable(endpoints.getSubsets()).ifPresent(subsets -> 
subsets.forEach(
+            subset -> ofNullable(subset.getAddresses()).ifPresent(addresses -> 
addresses.forEach(
+                address -> ipServiceMap.remove(address.getIp())
+            ))
+        ));
+        recompose();
+    }
+
+    private void recompose() {
+        ipPodMap.forEach((ip, pod) -> {
+            final String namespaceService = ipServiceMap.get(ip);
+            if (isNullOrEmpty(namespaceService)) {
+                podServiceMap.remove(ip);
+                return;
+            }
+
+            final V1ObjectMeta podMetadata = pod.getMetadata();
+            if (isNull(podMetadata)) {
+                log.warn("Pod metadata is null, {}", pod);
+                return;
+            }
+
+            podServiceMap.put(pod.getMetadata().getName(), namespaceService);
+        });
+    }
+
+    public String findServiceName(String podName) {
+        return this.podServiceMap.get(podName);
+    }
+}
diff --git 
a/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
 
b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
new file mode 100644
index 0000000..1996287
--- /dev/null
+++ 
b/oap-server/analyzer/meter-analyzer/src/test/java/org/apache/skywalking/oap/meter/analyzer/dsl/K8sTagTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.meter.analyzer.dsl;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Collection;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.meter.analyzer.k8s.K8sInfoRegistry;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
+
+import static com.google.common.collect.ImmutableMap.of;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
+
+@Slf4j
+@RunWith(Parameterized.class)
+public class K8sTagTest {
+
+    @Parameterized.Parameter
+    public String name;
+
+    @Parameterized.Parameter(1)
+    public ImmutableMap<String, SampleFamily> input;
+
+    @Parameterized.Parameter(2)
+    public String expression;
+
+    @Parameterized.Parameter(3)
+    public Result want;
+
+    @Parameterized.Parameter(4)
+    public boolean isThrow;
+
+    @Parameterized.Parameters(name = "{index}: {0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {
+                "Pod2Service",
+                of("container_cpu_usage_seconds_total", 
SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                          .labels(
+                              of("container", "my-nginx", "cpu", "total", 
"pod", "my-nginx-5dc4865748-mbczh"))
+                          .value(2)
+                          .build(),
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "kube-state-metrics", "cpu", 
"total", "pod",
+                                  "kube-state-metrics-6f979fd498-z7xwx"
+                              ))
+                          .value(1)
+                          .build()
+                ).build()),
+                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , 
K8sRetagType.Pod2Service , 'pod')",
+                Result.success(SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "my-nginx", "cpu", "total", 
"pod", "my-nginx-5dc4865748-mbczh",
+                                  "service", "nginx-service.default"
+                              ))
+                          .value(2)
+                          .build(),
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "kube-state-metrics", "cpu", 
"total", "pod",
+                                  "kube-state-metrics-6f979fd498-z7xwx",
+                                  "service", "kube-state-metrics.kube-system"
+                              ))
+                          .value(1)
+                          .build()
+                ).build()),
+                false,
+                },
+            {
+                "Pod2Service_no_pod",
+                of("container_cpu_usage_seconds_total", 
SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                          .labels(
+                              of("container", "my-nginx", "cpu", "total", 
"pod", "my-nginx-5dc4865748-no-pod"))
+                          .value(2)
+                          .build(),
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "kube-state-metrics", "cpu", 
"total", "pod",
+                                  "kube-state-metrics-6f979fd498-z7xwx"
+                              ))
+                          .value(1)
+                          .build()
+                ).build()),
+                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , 
K8sRetagType.Pod2Service , 'pod')",
+                Result.success(SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "my-nginx", "cpu", "total", 
"pod", "my-nginx-5dc4865748-no-pod"
+                              ))
+                          .value(2)
+                          .build(),
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "kube-state-metrics", "cpu", 
"total", "pod",
+                                  "kube-state-metrics-6f979fd498-z7xwx",
+                                  "service", "kube-state-metrics.kube-system"
+                              ))
+                          .value(1)
+                          .build()
+                ).build()),
+                false,
+                },
+            {
+                "Pod2Service_no_service",
+                of("container_cpu_usage_seconds_total", 
SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                          .labels(
+                              of("container", "my-nginx", "cpu", "total", 
"pod", "my-nginx-5dc4865748-no-service"))
+                          .value(2)
+                          .build(),
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "kube-state-metrics", "cpu", 
"total", "pod",
+                                  "kube-state-metrics-6f979fd498-z7xwx"
+                              ))
+                          .value(1)
+                          .build()
+                ).build()),
+                "container_cpu_usage_seconds_total.retagByK8sMeta('service' , 
K8sRetagType.Pod2Service , 'pod')",
+                Result.success(SampleFamilyBuilder.newBuilder(
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "my-nginx", "cpu", "total", 
"pod", "my-nginx-5dc4865748-no-service"
+                              ))
+                          .value(2)
+                          .build(),
+                    Sample.builder()
+                          .labels(
+                              of(
+                                  "container", "kube-state-metrics", "cpu", 
"total", "pod",
+                                  "kube-state-metrics-6f979fd498-z7xwx",
+                                  "service", "kube-state-metrics.kube-system"
+                              ))
+                          .value(1)
+                          .build()
+                ).build()),
+                false,
+                },
+            });
+    }
+
+    @Before
+    public void setup() {
+        Whitebox.setInternalState(K8sInfoRegistry.class, "INSTANCE",
+                                  Mockito.spy(K8sInfoRegistry.getInstance())
+        );
+        
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-mbczh")).thenReturn(
+            "nginx-service.default");
+        
when(K8sInfoRegistry.getInstance().findServiceName("kube-state-metrics-6f979fd498-z7xwx")).thenReturn(
+            "kube-state-metrics.kube-system");
+        
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-pod")).thenReturn(
+            null);
+        
when(K8sInfoRegistry.getInstance().findServiceName("my-nginx-5dc4865748-no-service")).thenReturn(
+            null);
+    }
+
+    @Test
+    public void test() {
+        Expression e = DSL.parse(expression);
+        Result r = null;
+        try {
+            r = e.run(input);
+        } catch (Throwable t) {
+            if (isThrow) {
+                return;
+            }
+            log.error("Test failed", t);
+            fail("Should not throw anything");
+        }
+        if (isThrow) {
+            fail("Should throw something");
+        }
+        assertThat(r, is(want));
+    }
+}
\ No newline at end of file

Reply via email to