This is an automated email from the ASF dual-hosted git repository. kezhenxu94 pushed a commit to branch envoy/mal in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit be62df5f95fc8dd2b401f673d2a45ce97c7806de Author: kezhenxu94 <[email protected]> AuthorDate: Mon Dec 28 15:29:28 2020 +0800 Enhance Envoy metrics service analyzer by MAL --- CHANGES.md | 1 + docs/en/concepts-and-designs/mal.md | 8 +- .../envoy/examples/metrics/docker-compose.yaml | 2 +- .../oap/meter/analyzer/dsl/SampleFamily.java | 25 ++++- .../prometheus/PrometheusMetricConverter.java | 23 +++-- .../src/main/resources/application.yml | 1 + .../main/resources/envoy-metrics-rules/envoy.yaml | 42 +++++++++ .../src/main/resources/oal/envoy.oal | 22 ----- .../otel-oc-rules/istio-controlplane.yaml | 2 +- .../resources/ui-initialized-templates/istio.yml | 31 ++++++- .../envoy-metrics-receiver-plugin/pom.xml | 5 + .../receiver/envoy/EnvoyMetricReceiverConfig.java | 11 +++ .../envoy/EnvoyMetricReceiverProvider.java | 2 +- .../receiver/envoy/MetricServiceGRPCHandler.java | 102 +++++++++------------ .../adapters/ProtoMetricFamily2MetricsAdapter.java | 85 +++++++++++++++++ 15 files changed, 259 insertions(+), 103 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 918b005..6878198 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -38,6 +38,7 @@ Release Notes. * Add component ID for NodeJS Axios plugin. * Fix searchService method error in storage-influxdb-plugin. * Add JavaScript component ID. +* Adopt the [MAL](docs/en/concepts-and-designs/mal.md) in Envoy metrics service analyzer. #### UI * Fix un-removed tags in trace query. diff --git a/docs/en/concepts-and-designs/mal.md b/docs/en/concepts-and-designs/mal.md index 30834af..b1edc70 100644 --- a/docs/en/concepts-and-designs/mal.md +++ b/docs/en/concepts-and-designs/mal.md @@ -59,7 +59,7 @@ Between two scalars: they evaluate to another scalar that is the result of the o 1 + 2 ``` -Between a sample family and a scalar, the operator is applied to the value of every sample in the smaple family. For example: +Between a sample family and a scalar, the operator is applied to the value of every sample in the sample family. For example: ``` instance_trace_count + 2 @@ -110,9 +110,9 @@ Sample family supports the following aggregation operations that can be used to resulting in a new sample family of fewer samples(even single one) with aggregated values: - sum (calculate sum over dimensions) - - min (select minimum over dimensions) (TODO) - - max (select maximum over dimensions) (TODO) - - avg (calculate the average over dimensions) (TODO) + - min (select minimum over dimensions) + - max (select maximum over dimensions) + - avg (calculate the average over dimensions) These operations can be used to aggregate over all label dimensions or preserve distinct dimensions by inputting `by` parameter. diff --git a/docs/en/setup/envoy/examples/metrics/docker-compose.yaml b/docs/en/setup/envoy/examples/metrics/docker-compose.yaml index 2250d52..6f069da 100644 --- a/docs/en/setup/envoy/examples/metrics/docker-compose.yaml +++ b/docs/en/setup/envoy/examples/metrics/docker-compose.yaml @@ -17,7 +17,7 @@ version: "3" services: envoy: - image: envoyproxy/envoy-alpine:latest + image: envoyproxy/envoy-alpine:v1.16.2 command: /usr/local/bin/envoy -c /etc/envoy.yaml --service-cluster envoy-proxy ports: - 10000:10000 diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java index 24e9fb4..c1115d1 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/dsl/SampleFamily.java @@ -28,6 +28,7 @@ import groovy.lang.Closure; import io.vavr.Function2; import io.vavr.Tuple; import io.vavr.Tuple2; +import java.util.function.DoubleBinaryOperator; import lombok.AccessLevel; import lombok.Builder; import lombok.EqualsAndHashCode; @@ -161,12 +162,32 @@ public class SampleFamily { /* Aggregation operators */ public SampleFamily sum(List<String> by) { + return aggregate(by, Double::sum); + } + + public SampleFamily max(List<String> by) { + return aggregate(by, Double::max); + } + + public SampleFamily min(List<String> by) { + return aggregate(by, Double::min); + } + + public SampleFamily avg(List<String> by) { + final SampleFamily summation = aggregate(by, Double::sum); + for (int i = 0; i < summation.samples.length; i++) { + summation.samples[i] = summation.samples[i].newValue(s -> s / summation.samples.length); + } + return summation; + } + + protected SampleFamily aggregate(List<String> by, DoubleBinaryOperator aggregator) { ExpressionParsingContext.get().ifPresent(ctx -> ctx.aggregationLabels.addAll(by)); if (this == EMPTY) { return EMPTY; } if (by == null) { - double result = Arrays.stream(samples).mapToDouble(s -> s.value).reduce(Double::sum).orElse(0.0D); + double result = Arrays.stream(samples).mapToDouble(s -> s.value).reduce(aggregator).orElse(0.0D); return SampleFamily.build(this.context, newSample(ImmutableMap.of(), samples[0].timestamp, result)); } return SampleFamily.build(this.context, Arrays.stream(samples) @@ -176,7 +197,7 @@ public class SampleFamily { .collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))) .entrySet().stream() .map(entry -> newSample(entry.getKey(), entry.getValue().get(0).timestamp, entry.getValue().stream() - .mapToDouble(s -> s.value).reduce(Double::sum).orElse(0.0D))) + .mapToDouble(s -> s.value).reduce(aggregator).orElse(0.0D))) .toArray(Sample[]::new)); } diff --git a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java index 47c5e89..9af3960 100644 --- a/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java +++ b/oap-server/analyzer/meter-analyzer/src/main/java/org/apache/skywalking/oap/meter/analyzer/prometheus/PrometheusMetricConverter.java @@ -86,15 +86,15 @@ public class PrometheusMetricConverter { private Stream<Tuple2<String, SampleFamily>> convertMetric(Metric metric) { return Match(metric).of( Case($(instanceOf(Histogram.class)), t -> Stream.of( - Tuple.of(metric.getName() + "_count", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_count") + Tuple.of(escapedName(metric.getName() + "_count"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_count")) .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Histogram) metric).getSampleCount()).build()).build()), - Tuple.of(metric.getName() + "_sum", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_sum") + Tuple.of(escapedName(metric.getName() + "_sum"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_sum")) .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Histogram) metric).getSampleSum()).build()).build()), convertToSample(metric).orElse(NIL))), Case($(instanceOf(Summary.class)), t -> Stream.of( - Tuple.of(metric.getName() + "_count", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_count") + Tuple.of(escapedName(metric.getName() + "_count"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_count")) .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Summary) metric).getSampleCount()).build()).build()), - Tuple.of(metric.getName() + "_sum", SampleFamilyBuilder.newBuilder(Sample.builder().name(metric.getName() + "_sum") + Tuple.of(escapedName(metric.getName() + "_sum"), SampleFamilyBuilder.newBuilder(Sample.builder().name(escapedName(metric.getName() + "_sum")) .timestamp(metric.getTimestamp()).labels(ImmutableMap.copyOf(metric.getLabels())).value(((Summary) metric).getSampleSum()).build()).build()), convertToSample(metric).orElse(NIL))), Case($(), t -> Stream.of(convertToSample(metric).orElse(NIL))) @@ -104,13 +104,13 @@ public class PrometheusMetricConverter { private Optional<Tuple2<String, SampleFamily>> convertToSample(Metric metric) { Sample[] ss = Match(metric).of( Case($(instanceOf(Counter.class)), t -> Collections.singletonList(Sample.builder() - .name(t.getName()) + .name(escapedName(t.getName())) .labels(ImmutableMap.copyOf(t.getLabels())) .timestamp(t.getTimestamp()) .value(t.getValue()) .build())), Case($(instanceOf(Gauge.class)), t -> Collections.singletonList(Sample.builder() - .name(t.getName()) + .name(escapedName(t.getName())) .labels(ImmutableMap.copyOf(t.getLabels())) .timestamp(t.getTimestamp()) .value(t.getValue()) @@ -118,7 +118,7 @@ public class PrometheusMetricConverter { Case($(instanceOf(Histogram.class)), t -> t.getBuckets() .entrySet().stream() .map(b -> Sample.builder() - .name(t.getName()) + .name(escapedName(t.getName())) .labels(ImmutableMap.<String, String>builder() .putAll(t.getLabels()) .put("le", b.getKey().toString()) @@ -129,7 +129,7 @@ public class PrometheusMetricConverter { Case($(instanceOf(Summary.class)), t -> t.getQuantiles().entrySet().stream() .map(b -> Sample.builder() - .name(t.getName()) + .name(escapedName(t.getName())) .labels(ImmutableMap.<String, String>builder() .putAll(t.getLabels()) .put("quantile", b.getKey().toString()) @@ -141,6 +141,11 @@ public class PrometheusMetricConverter { if (ss.length < 1) { return Optional.empty(); } - return Optional.of(Tuple.of(metric.getName(), SampleFamilyBuilder.newBuilder(ss).build())); + return Optional.of(Tuple.of(escapedName(metric.getName()), SampleFamilyBuilder.newBuilder(ss).build())); + } + + // Returns the escaped name of the given one, with "." replaced by "_" + protected String escapedName(final String name) { + return name.replaceAll("\\.", "_"); } } diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 04edb32..5fbadb0 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -266,6 +266,7 @@ envoy-metric: # to append the version number to the service name. # Be careful, when using environment variables to pass this configuration, use single quotes(`''`) to avoid it being evaluated by the shell. k8sServiceNameRule: ${K8S_SERVICE_NAME_RULE:"${service.metadata.name}"} + enabledMALRules: ${SW_ENVOY_METRIC_MAL_RULES:"envoy"} prometheus-fetcher: selector: ${SW_PROMETHEUS_FETCHER:-} diff --git a/oap-server/server-bootstrap/src/main/resources/envoy-metrics-rules/envoy.yaml b/oap-server/server-bootstrap/src/main/resources/envoy-metrics-rules/envoy.yaml new file mode 100644 index 0000000..bac0d36 --- /dev/null +++ b/oap-server/server-bootstrap/src/main/resources/envoy-metrics-rules/envoy.yaml @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This will parse a textual representation of a duration. The formats +# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS} +# with days considered to be exactly 24 hours. +# <p> +# Examples: +# <pre> +# "PT20.345S" -- parses as "20.345 seconds" +# "PT15M" -- parses as "15 minutes" (where a minute is 60 seconds) +# "PT10H" -- parses as "10 hours" (where an hour is 3600 seconds) +# "P2D" -- parses as "2 days" (where a day is 24 hours or 86400 seconds) +# "P2DT3H4M" -- parses as "2 days, 3 hours and 4 minutes" +# "P-6H3M" -- parses as "-6 hours and +3 minutes" +# "-P6H3M" -- parses as "-6 hours and -3 minutes" +# "-P-6H+3M" -- parses as "+6 hours and -3 minutes" +# </pre> + +expSuffix: tag({tags -> tags.cluster = 'istio::' + tags.cluster}).instance(['cluster'], ['instance']) +metricPrefix: envoy +metricsRules: + - name: heap_memory_max_used + exp: server_memory_heap_size.max(['cluster', 'instance']) + - name: heap_memory_used + exp: server_memory_heap_size + - name: total_connections_used + exp: server_total_connections.max(['cluster', 'instance']) + - name: parent_connections_used + exp: server_parent_connections.max(['cluster', 'instance']) diff --git a/oap-server/server-bootstrap/src/main/resources/oal/envoy.oal b/oap-server/server-bootstrap/src/main/resources/oal/envoy.oal deleted file mode 100644 index 90d7b4d..0000000 --- a/oap-server/server-bootstrap/src/main/resources/oal/envoy.oal +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Envoy instance metrics -envoy_heap_memory_max_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.memory_heap_size").maxDouble(); -envoy_total_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.total_connections").maxDouble(); -envoy_parent_connections_used = from(EnvoyInstanceMetric.value).filter(metricName == "server.parent_connections").maxDouble(); \ No newline at end of file diff --git a/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml b/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml index d32cf5a..3a5c3d3 100644 --- a/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml +++ b/oap-server/server-bootstrap/src/main/resources/otel-oc-rules/istio-controlplane.yaml @@ -28,7 +28,7 @@ # "-P6H3M" -- parses as "-6 hours and -3 minutes" # "-P-6H+3M" -- parses as "+6 hours and -3 minutes" # </pre> -expSuffix: tag({tags -> tags.cluster = 'istio-ctrl::' + tags.cluster}).service(['cluster', 'app']) +expSuffix: tag({tags -> tags.cluster = 'istio::' + tags.cluster}).service(['cluster', 'app']) metricPrefix: meter_istio metricsRules: ## Resource usage diff --git a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml index bea98db..ed89ce8 100644 --- a/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml +++ b/oap-server/server-bootstrap/src/main/resources/ui-initialized-templates/istio.yml @@ -30,7 +30,7 @@ templates: { "name": "Istio", "type": "service", - "serviceGroup": "istio-ctrl", + "serviceGroup": "istio", "children": [ { "name": "Control Plane", @@ -164,6 +164,33 @@ templates: "chartType": "ChartArea" } ] + }, + { + "name": "Data Plane", + "children": [ + { + "width": "3", + "title": "Heap Memory Max Used", + "height": 350, + "entityType": "ServiceInstance", + "independentSelector": false, + "metricType": "REGULAR_VALUE", + "metricName": "envoy_heap_memory_max_used,envoy_heap_memory_used", + "queryMetricType": "readMetricsValues", + "chartType": "ChartLine" + }, + { + "width": "3", + "title": "Connections Used", + "height": 350, + "entityType": "ServiceInstance", + "independentSelector": false, + "metricType": "REGULAR_VALUE", + "metricName": "envoy_total_connections_used,envoy_parent_connections_used", + "queryMetricType": "readMetricsValues", + "chartType": "ChartLine" + } + ] } ] } @@ -172,4 +199,4 @@ templates: # False means providing a basic template, user needs to add it manually. activated: true # True means wouldn't show up on the dashboard. Only keeps the definition in the storage. - disabled: false \ No newline at end of file + disabled: false diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml index 93eed36..c1caf8b 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/pom.xml @@ -36,6 +36,11 @@ </dependency> <dependency> <groupId>org.apache.skywalking</groupId> + <artifactId>meter-analyzer</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> <artifactId>skywalking-mesh-receiver-plugin</artifactId> <version>${project.version}</version> </dependency> diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java index 5819d80..5a7b31b 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverConfig.java @@ -18,13 +18,17 @@ package org.apache.skywalking.oap.server.receiver.envoy; +import com.google.common.base.Splitter; import com.google.common.base.Strings; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; import lombok.Getter; +import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rule; +import org.apache.skywalking.oap.meter.analyzer.prometheus.rule.Rules; import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; public class EnvoyMetricReceiverConfig extends ModuleConfig { @Getter @@ -32,6 +36,8 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig { private String alsHTTPAnalysis; @Getter private String k8sServiceNameRule; + @Getter + private String enabledMALRules; public List<String> getAlsHTTPAnalysis() { if (Strings.isNullOrEmpty(alsHTTPAnalysis)) { @@ -39,4 +45,9 @@ public class EnvoyMetricReceiverConfig extends ModuleConfig { } return Arrays.stream(alsHTTPAnalysis.trim().split(",")).map(String::trim).collect(Collectors.toList()); } + + public List<Rule> rules() throws ModuleStartException { + final List<String> enabledRules = Splitter.on(",").trimResults().omitEmptyStrings().splitToList(getEnabledMALRules()); + return Rules.loadRules("envoy-metrics-rules", enabledRules); + } } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java index 28a9126..df723b9 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/EnvoyMetricReceiverProvider.java @@ -68,7 +68,7 @@ public class EnvoyMetricReceiverProvider extends ModuleProvider { .getService(OALEngineLoaderService.class) .load(EnvoyOALDefine.INSTANCE); - service.addHandler(new MetricServiceGRPCHandler(getManager())); + service.addHandler(new MetricServiceGRPCHandler(getManager(), config)); } service.addHandler(new AccessLogServiceGRPCHandler(getManager(), config)); } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java index 0ed9b3e..1f62a33 100644 --- a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/MetricServiceGRPCHandler.java @@ -25,16 +25,22 @@ import io.envoyproxy.envoy.service.metrics.v2.StreamMetricsResponse; import io.grpc.stub.StreamObserver; import io.prometheus.client.Metrics; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.apm.util.StringUtil; +import org.apache.skywalking.oap.meter.analyzer.prometheus.PrometheusMetricConverter; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.analysis.IDManager; import org.apache.skywalking.oap.server.core.analysis.TimeBucket; -import org.apache.skywalking.oap.server.core.source.EnvoyInstanceMetric; +import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem; import org.apache.skywalking.oap.server.core.analysis.NodeType; import org.apache.skywalking.oap.server.core.source.ServiceInstanceUpdate; import org.apache.skywalking.oap.server.core.source.SourceReceiver; import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric; +import org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters.ProtoMetricFamily2MetricsAdapter; import org.apache.skywalking.oap.server.telemetry.TelemetryModule; import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; @@ -44,10 +50,11 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; @Slf4j public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceImplBase { private final SourceReceiver sourceReceiver; - private CounterMetrics counter; - private HistogramMetrics histogram; + private final CounterMetrics counter; + private final HistogramMetrics histogram; + private final List<PrometheusMetricConverter> converters; - public MetricServiceGRPCHandler(ModuleManager moduleManager) { + public MetricServiceGRPCHandler(final ModuleManager moduleManager, final EnvoyMetricReceiverConfig config) throws ModuleStartException { sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class); MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME) .provider() @@ -60,6 +67,13 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI "envoy_metric_in_latency", "The process latency of service metrics receiver", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE ); + + final MeterSystem meterSystem = moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class); + + converters = config.rules() + .stream() + .map(rule -> new PrometheusMetricConverter(rule, meterSystem)) + .collect(Collectors.toList()); } @Override @@ -79,17 +93,15 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI isFirst = false; StreamMetricsMessage.Identifier identifier = message.getIdentifier(); Node node = identifier.getNode(); - if (node != null) { - String nodeId = node.getId(); - if (!StringUtil.isEmpty(nodeId)) { - serviceInstanceName = nodeId; - } - String cluster = node.getCluster(); - if (!StringUtil.isEmpty(cluster)) { - serviceName = cluster; - if (serviceInstanceName == null) { - serviceInstanceName = serviceName; - } + String nodeId = node.getId(); + if (!StringUtil.isEmpty(nodeId)) { + serviceInstanceName = nodeId; + } + String cluster = node.getCluster(); + if (!StringUtil.isEmpty(cluster)) { + serviceName = cluster; + if (serviceInstanceName == null) { + serviceInstanceName = serviceName; } } @@ -108,53 +120,23 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI if (StringUtil.isNotEmpty(serviceName) && StringUtil.isNotEmpty(serviceInstanceName)) { List<Metrics.MetricFamily> list = message.getEnvoyMetricsList(); boolean needHeartbeatUpdate = true; - for (int i = 0; i < list.size(); i++) { + + for (final Metrics.MetricFamily metricFamily : list) { counter.inc(); final String serviceId = IDManager.ServiceID.buildId(serviceName, NodeType.Normal); - final String serviceInstanceId = IDManager.ServiceInstanceID.buildId( - serviceId, serviceInstanceName); - - HistogramMetrics.Timer timer = histogram.createTimer(); - try { - Metrics.MetricFamily metricFamily = list.get(i); - double value = 0; - long timestamp = 0; - switch (metricFamily.getType()) { - case GAUGE: - for (Metrics.Metric metrics : metricFamily.getMetricList()) { - timestamp = metrics.getTimestampMs(); - value = metrics.getGauge().getValue(); - - if (timestamp > 1000000000000000000L) { - /** - * Several versions of envoy in istio.deps send timestamp in nanoseconds, - * instead of milliseconds(protocol says). - * - * Sadly, but have to fix it forcedly. - * - * An example of timestamp is '1552303033488741055', clearly it is not in milliseconds. - * - * This should be removed in the future. - */ - timestamp /= 1_000_000; - } - - EnvoyInstanceMetric metricSource = new EnvoyInstanceMetric(); - metricSource.setServiceId(serviceId); - metricSource.setServiceName(serviceName); - metricSource.setId(serviceInstanceId); - metricSource.setName(serviceInstanceName); - metricSource.setMetricName(metricFamily.getName()); - metricSource.setValue(value); - metricSource.setTimeBucket(TimeBucket.getMinuteTimeBucket(timestamp)); - sourceReceiver.receive(metricSource); - } - break; - default: - continue; - } - if (needHeartbeatUpdate) { + + try (final HistogramMetrics.Timer ignored = histogram.createTimer()) { + final ProtoMetricFamily2MetricsAdapter adapter = new ProtoMetricFamily2MetricsAdapter(metricFamily); + final Stream<Metric> metrics = adapter.adapt().peek(it -> { + it.getLabels().putIfAbsent("cluster", serviceName); + it.getLabels().putIfAbsent("instance", serviceInstanceName); + }); + converters.forEach(converter -> converter.toMeter(metrics)); + + if (needHeartbeatUpdate && list.get(0).getMetricCount() > 0) { + final long timestamp = adapter.adaptTimestamp(list.get(0).getMetric(0)); + // Send heartbeat ServiceInstanceUpdate serviceInstanceUpdate = new ServiceInstanceUpdate(); serviceInstanceUpdate.setName(serviceInstanceName); @@ -163,8 +145,6 @@ public class MetricServiceGRPCHandler extends MetricsServiceGrpc.MetricsServiceI sourceReceiver.receive(serviceInstanceUpdate); needHeartbeatUpdate = false; } - } finally { - timer.finish(); } } } diff --git a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java new file mode 100644 index 0000000..578b491 --- /dev/null +++ b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/metrics/adapters/ProtoMetricFamily2MetricsAdapter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.receiver.envoy.metrics.adapters; + +import io.prometheus.client.Metrics; +import java.util.Map; +import java.util.stream.Stream; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Gauge; +import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric; + +import static java.util.stream.Collectors.toMap; + +@RequiredArgsConstructor +public class ProtoMetricFamily2MetricsAdapter { + protected final Metrics.MetricFamily metricFamily; + + public Stream<Metric> adapt() { + // TODO: should adapt more types + switch (metricFamily.getType()) { + case GAUGE: + return metricFamily.getMetricList() + .stream() + .map(it -> Gauge.builder() + .name(adaptMetricsName(it)) + .value(adaptValue(it)) + .timestamp(adaptTimestamp(it)) + .labels(adaptLabels(it)) + .build()); + default: + return Stream.of(); + } + } + + @SuppressWarnings("unused") + public String adaptMetricsName(final Metrics.Metric metric) { + return metricFamily.getName(); + } + + public double adaptValue(final Metrics.Metric it) { + return it.getGauge().getValue(); + } + + public Map<String, String> adaptLabels(final Metrics.Metric metric) { + return metric.getLabelList() + .stream() + .collect(toMap(Metrics.LabelPair::getName, Metrics.LabelPair::getValue)); + } + + public long adaptTimestamp(final Metrics.Metric metric) { + long timestamp = metric.getTimestampMs(); + + if (timestamp > 1000000000000000000L) { + /* + * Several versions of envoy in istio.deps send timestamp in nanoseconds, + * instead of milliseconds(protocol says). + * + * Sadly, but have to fix it forcefully. + * + * An example of timestamp is '1552303033488741055', clearly it is not in milliseconds. + * + * This should be removed in the future. + */ + timestamp /= 1_000_000; + } + + return timestamp; + } +}
