This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c31efb  [BEAM-6865] Move MetricsApi updates from flink.metrics to 
core.metrics
     new 65dbb4d  Merge pull request #8176 from ibzib/update-metrics
5c31efb is described below

commit 5c31efbae5fa40c024a47a44af3e6bc8a079cc2a
Author: Kyle Weaver <kcwea...@google.com>
AuthorDate: Fri Mar 29 18:02:01 2019 -0700

    [BEAM-6865] Move MetricsApi updates from flink.metrics to core.metrics
---
 .../runners/core/metrics/MetricsContainerImpl.java | 47 ++++++++++++++++++++++
 .../flink/metrics/FlinkMetricContainer.java        | 46 +--------------------
 2 files changed, 49 insertions(+), 44 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
index 630f619..bf5acfe 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.metrics;
 
+import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Serializable;
@@ -25,15 +26,23 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
+import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.Metric;
 import org.apache.beam.sdk.metrics.MetricKey;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Holds the metrics for a single step and uses metric cells that allow 
extracting the cumulative
@@ -50,6 +59,7 @@ import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableLis
  */
 @Experimental(Kind.METRICS)
 public class MetricsContainerImpl implements Serializable, MetricsContainer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(MetricsContainerImpl.class);
 
   @Nullable private final String stepName;
 
@@ -231,6 +241,43 @@ public class MetricsContainerImpl implements Serializable, 
MetricsContainer {
     updateGauges(gauges, other.gauges);
   }
 
+  /** Update values of this {@link MetricsContainerImpl} by reading from 
{@code monitoringInfos}. */
+  public void update(Iterable<MonitoringInfo> monitoringInfos) {
+    monitoringInfos.forEach(
+        monitoringInfo -> {
+          if (monitoringInfo.hasMetric()) {
+            String urn = monitoringInfo.getUrn();
+            MetricName metricName = parseUrn(urn);
+            org.apache.beam.model.pipeline.v1.MetricsApi.Metric metric = 
monitoringInfo.getMetric();
+            if (metric.hasCounterData()) {
+              CounterData counterData = metric.getCounterData();
+              if (counterData.getValueCase() == 
CounterData.ValueCase.INT64_VALUE) {
+                Counter counter = getCounter(metricName);
+                counter.inc(counterData.getInt64Value());
+              } else {
+                LOG.warn("Unsupported CounterData type: {}", counterData);
+              }
+            } else if (metric.hasDistributionData()) {
+              DistributionData distributionData = metric.getDistributionData();
+              if (distributionData.hasIntDistributionData()) {
+                Distribution distribution = getDistribution(metricName);
+                IntDistributionData intDistributionData = 
distributionData.getIntDistributionData();
+                distribution.update(
+                    intDistributionData.getSum(),
+                    intDistributionData.getCount(),
+                    intDistributionData.getMin(),
+                    intDistributionData.getMax());
+              } else {
+                LOG.warn("Unsupported DistributionData type: {}", 
distributionData);
+              }
+            } else if (metric.hasExtremaData()) {
+              ExtremaData extremaData = metric.getExtremaData();
+              LOG.warn("Extrema metric unsupported: {}", extremaData);
+            }
+          }
+        });
+  }
+
   private void updateCounters(
       MetricsMap<MetricName, CounterCell> current, MetricsMap<MetricName, 
CounterCell> updates) {
     for (Map.Entry<MetricName, CounterCell> counter : updates.entries()) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index 87c9afd..8a1781d 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,21 +17,14 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
-import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn;
 import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.model.pipeline.v1.MetricsApi.CounterData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.DistributionData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.ExtremaData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.IntDistributionData;
-import org.apache.beam.model.pipeline.v1.MetricsApi.Metric;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
-import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.DistributionResult;
 import org.apache.beam.sdk.metrics.GaugeResult;
 import org.apache.beam.sdk.metrics.MetricKey;
@@ -39,7 +32,6 @@ import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricQueryResults;
 import org.apache.beam.sdk.metrics.MetricResult;
 import org.apache.beam.sdk.metrics.MetricResults;
-import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.metrics.MetricsFilter;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -89,7 +81,7 @@ public class FlinkMetricContainer {
     this.metricsAccumulator = (MetricsAccumulator) metricsAccumulator;
   }
 
-  public MetricsContainer getMetricsContainer(String stepName) {
+  public MetricsContainerImpl getMetricsContainer(String stepName) {
     return metricsAccumulator != null
         ? metricsAccumulator.getLocalValue().getContainer(stepName)
         : null;
@@ -100,41 +92,7 @@ public class FlinkMetricContainer {
    * along to Flink's internal metrics framework.
    */
   public void updateMetrics(String stepName, List<MonitoringInfo> 
monitoringInfos) {
-    MetricsContainer metricsContainer = getMetricsContainer(stepName);
-    monitoringInfos.forEach(
-        monitoringInfo -> {
-          if (monitoringInfo.hasMetric()) {
-            String urn = monitoringInfo.getUrn();
-            MetricName metricName = parseUrn(urn);
-            Metric metric = monitoringInfo.getMetric();
-            if (metric.hasCounterData()) {
-              CounterData counterData = metric.getCounterData();
-              if (counterData.getValueCase() == 
CounterData.ValueCase.INT64_VALUE) {
-                org.apache.beam.sdk.metrics.Counter counter =
-                    metricsContainer.getCounter(metricName);
-                counter.inc(counterData.getInt64Value());
-              } else {
-                LOG.warn("Unsupported CounterData type: {}", counterData);
-              }
-            } else if (metric.hasDistributionData()) {
-              DistributionData distributionData = metric.getDistributionData();
-              if (distributionData.hasIntDistributionData()) {
-                Distribution distribution = 
metricsContainer.getDistribution(metricName);
-                IntDistributionData intDistributionData = 
distributionData.getIntDistributionData();
-                distribution.update(
-                    intDistributionData.getSum(),
-                    intDistributionData.getCount(),
-                    intDistributionData.getMin(),
-                    intDistributionData.getMax());
-              } else {
-                LOG.warn("Unsupported DistributionData type: {}", 
distributionData);
-              }
-            } else if (metric.hasExtremaData()) {
-              ExtremaData extremaData = metric.getExtremaData();
-              LOG.warn("Extrema metric unsupported: {}", extremaData);
-            }
-          }
-        });
+    getMetricsContainer(stepName).update(monitoringInfos);
     updateMetrics(stepName);
   }
 

Reply via email to