This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5c31efb [BEAM-6865] Move MetricsApi updates from flink.metrics to core.metrics new 65dbb4d Merge pull request #8176 from ibzib/update-metrics 5c31efb is described below commit 5c31efbae5fa40c024a47a44af3e6bc8a079cc2a Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Fri Mar 29 18:02:01 2019 -0700 [BEAM-6865] Move MetricsApi updates from flink.metrics to core.metrics --- .../runners/core/metrics/MetricsContainerImpl.java | 47 ++++++++++++++++++++++ .../flink/metrics/FlinkMetricContainer.java | 46 +-------------------- 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 630f619..bf5acfe 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core.metrics; +import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn; import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; import java.io.Serializable; @@ -25,15 +26,23 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import javax.annotation.Nullable; +import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData; +import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData; +import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData; +import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metric; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Holds the metrics for a single step and uses metric cells that allow extracting the cumulative @@ -50,6 +59,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableLis */ @Experimental(Kind.METRICS) public class MetricsContainerImpl implements Serializable, MetricsContainer { + private static final Logger LOG = LoggerFactory.getLogger(MetricsContainerImpl.class); @Nullable private final String stepName; @@ -231,6 +241,43 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { updateGauges(gauges, other.gauges); } + /** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */ + public void update(Iterable<MonitoringInfo> monitoringInfos) { + monitoringInfos.forEach( + monitoringInfo -> { + if (monitoringInfo.hasMetric()) { + String urn = monitoringInfo.getUrn(); + MetricName metricName = parseUrn(urn); + org.apache.beam.model.pipeline.v1.MetricsApi.Metric metric = monitoringInfo.getMetric(); + if (metric.hasCounterData()) { + CounterData counterData = metric.getCounterData(); + if (counterData.getValueCase() == CounterData.ValueCase.INT64_VALUE) { + Counter counter = getCounter(metricName); + counter.inc(counterData.getInt64Value()); + } else { + LOG.warn("Unsupported CounterData type: {}", counterData); + } + } else if (metric.hasDistributionData()) { + DistributionData distributionData = metric.getDistributionData(); + if (distributionData.hasIntDistributionData()) { + Distribution distribution = getDistribution(metricName); + IntDistributionData intDistributionData = distributionData.getIntDistributionData(); + distribution.update( + intDistributionData.getSum(), + intDistributionData.getCount(), + intDistributionData.getMin(), + intDistributionData.getMax()); + } else { + LOG.warn("Unsupported DistributionData type: {}", distributionData); + } + } else if (metric.hasExtremaData()) { + ExtremaData extremaData = metric.getExtremaData(); + LOG.warn("Extrema metric unsupported: {}", extremaData); + } + } + }); + } + private void updateCounters( MetricsMap<MetricName, CounterCell> current, MetricsMap<MetricName, CounterCell> updates) { for (Map.Entry<MetricName, CounterCell> counter : updates.entries()) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index 87c9afd..8a1781d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -17,21 +17,14 @@ */ package org.apache.beam.runners.flink.metrics; -import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn; import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData; -import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData; -import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData; -import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData; -import org.apache.beam.model.pipeline.v1.MetricsApi.Metric; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeResult; import org.apache.beam.sdk.metrics.MetricKey; @@ -39,7 +32,6 @@ import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; -import org.apache.beam.sdk.metrics.MetricsContainer; import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; import org.apache.flink.api.common.accumulators.Accumulator; @@ -89,7 +81,7 @@ public class FlinkMetricContainer { this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator; } - public MetricsContainer getMetricsContainer(String stepName) { + public MetricsContainerImpl getMetricsContainer(String stepName) { return metricsAccumulator != null ? metricsAccumulator.getLocalValue().getContainer(stepName) : null; @@ -100,41 +92,7 @@ public class FlinkMetricContainer { * along to Flink's internal metrics framework. */ public void updateMetrics(String stepName, List<MonitoringInfo> monitoringInfos) { - MetricsContainer metricsContainer = getMetricsContainer(stepName); - monitoringInfos.forEach( - monitoringInfo -> { - if (monitoringInfo.hasMetric()) { - String urn = monitoringInfo.getUrn(); - MetricName metricName = parseUrn(urn); - Metric metric = monitoringInfo.getMetric(); - if (metric.hasCounterData()) { - CounterData counterData = metric.getCounterData(); - if (counterData.getValueCase() == CounterData.ValueCase.INT64_VALUE) { - org.apache.beam.sdk.metrics.Counter counter = - metricsContainer.getCounter(metricName); - counter.inc(counterData.getInt64Value()); - } else { - LOG.warn("Unsupported CounterData type: {}", counterData); - } - } else if (metric.hasDistributionData()) { - DistributionData distributionData = metric.getDistributionData(); - if (distributionData.hasIntDistributionData()) { - Distribution distribution = metricsContainer.getDistribution(metricName); - IntDistributionData intDistributionData = distributionData.getIntDistributionData(); - distribution.update( - intDistributionData.getSum(), - intDistributionData.getCount(), - intDistributionData.getMin(), - intDistributionData.getMax()); - } else { - LOG.warn("Unsupported DistributionData type: {}", distributionData); - } - } else if (metric.hasExtremaData()) { - ExtremaData extremaData = metric.getExtremaData(); - LOG.warn("Extrema metric unsupported: {}", extremaData); - } - } - }); + getMetricsContainer(stepName).update(monitoringInfos); updateMetrics(stepName); }