Repository: incubator-beam
Updated Branches:
  refs/heads/master e969f3d38 -> 3c731707b


Implement Metrics in the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/834933c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/834933c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/834933c5

Branch: refs/heads/master
Commit: 834933c520997b4f83cf8b04219c2c63dac61e61
Parents: 51fee39
Author: bchambers <bchamb...@google.com>
Authored: Wed Oct 12 10:55:53 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectMetrics.java      | 331 +++++++++++++++++++
 .../beam/runners/direct/DirectRunner.java       |   8 +-
 .../beam/runners/direct/EvaluationContext.java  |  10 +
 .../direct/ExecutorServiceParallelExecutor.java |   1 +
 .../direct/ImmutableListBundleFactory.java      |  10 +
 .../runners/direct/StepTransformResult.java     |  49 ++-
 .../beam/runners/direct/TransformExecutor.java  |  35 +-
 .../beam/runners/direct/TransformResult.java    |  12 +
 .../beam/runners/direct/DirectMetricsTest.java  | 133 ++++++++
 .../beam/runners/direct/DirectRunnerTest.java   |  36 ++
 .../runners/direct/TransformExecutorTest.java   |  12 +
 11 files changed, 602 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
new file mode 100644
index 0000000..a749a76
--- /dev/null
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static java.util.Arrays.asList;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.concurrent.GuardedBy;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.apache.beam.sdk.metrics.MetricsMap;
+
+/**
+ * Implementation of {@link MetricResults} for the Direct Runner.
+ */
+class DirectMetrics extends MetricResults {
+
+  // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in 
the DirectRunner.
+  private static final ExecutorService COUNTER_COMMITTER = 
Executors.newCachedThreadPool();
+
+  private interface MetricAggregation<UpdateT, ResultT> {
+    UpdateT zero();
+    UpdateT combine(Iterable<UpdateT> updates);
+    ResultT extract(UpdateT data);
+  }
+
+  /**
+   * Implementation of a metric in the direct runner.
+   *
+   * @param <UpdateT> The type of raw data received and aggregated across 
updates.
+   * @param <ResultT> The type of result extracted from the data.
+   */
+  private static class DirectMetric<UpdateT, ResultT> {
+    private final MetricAggregation<UpdateT, ResultT> aggregation;
+
+    private final AtomicReference<UpdateT> finishedCommitted;
+
+    private final Object attemptedLock = new Object();
+    @GuardedBy("attemptedLock")
+    private volatile UpdateT finishedAttempted;
+    @GuardedBy("attemptedLock")
+    private final ConcurrentMap<CommittedBundle<?>, UpdateT> inflightAttempted 
=
+        new ConcurrentHashMap<>();
+
+    public DirectMetric(MetricAggregation<UpdateT, ResultT> aggregation) {
+      this.aggregation = aggregation;
+      finishedCommitted = new AtomicReference<>(aggregation.zero());
+      finishedAttempted = aggregation.zero();
+    }
+
+    /**
+     * Add the given {@code tentativeCumulative} update to the physical 
aggregate.
+     *
+     * @param bundle The bundle receiving an update.
+     * @param tentativeCumulative The new cumulative value for the given 
bundle.
+     */
+    public void updatePhysical(CommittedBundle<?> bundle, UpdateT 
tentativeCumulative) {
+      // Add (or update) the cumulatiev value for the given bundle.
+      inflightAttempted.put(bundle, tentativeCumulative);
+    }
+
+    /**
+     * Commit a physical value for the given {@code bundle}.
+     *
+     * @param bundle The bundle being committed.
+     * @param finalCumulative The final cumulative value for the given bundle.
+     */
+    public void commitPhysical(final CommittedBundle<?> bundle, final UpdateT 
finalCumulative) {
+      // To prevent a query from blocking the commit, we perform the commit in 
two steps.
+      // 1. We perform a non-blocking write to the uncommitted table to make 
the new vaule
+      //    available immediately.
+      // 2. We submit a runnable that will commit the update and remove the 
tentative value in
+      //    a synchronized block.
+      inflightAttempted.put(bundle, finalCumulative);
+      COUNTER_COMMITTER.submit(new Runnable() {
+        @Override
+        public void run() {
+          synchronized (attemptedLock) {
+            finishedAttempted = aggregation.combine(asList(finishedAttempted, 
finalCumulative));
+            inflightAttempted.remove(bundle);
+          }
+        }
+      });
+    }
+
+    /** Extract the latest values from all attempted and in-progress bundles. 
*/
+    public ResultT extractLatestAttempted() {
+      ArrayList<UpdateT> updates = new ArrayList<>(inflightAttempted.size() + 
1);
+      // Within this block we know that will be consistent. Specifically, the 
only change that can
+      // happen concurrently is the addition of new (larger) values to 
inflightAttempted.
+      synchronized (attemptedLock) {
+        updates.add(finishedAttempted);
+        updates.addAll(inflightAttempted.values());
+      }
+      return aggregation.extract(aggregation.combine(updates));
+    }
+
+    /**
+     * Commit a logical value for the given {@code bundle}.
+     *
+     * @param bundle The bundle being committed.
+     * @param finalCumulative The final cumulative value for the given bundle.
+     */
+    public void commitLogical(final CommittedBundle<?> bundle, final UpdateT 
finalCumulative) {
+      UpdateT current;
+      do {
+        current = finishedCommitted.get();
+      } while (!finishedCommitted.compareAndSet(current,
+          aggregation.combine(asList(current, finalCumulative))));
+    }
+
+    /** Extract the value from all successfully committed bundles. */
+    public ResultT extractCommitted() {
+      return aggregation.extract(finishedCommitted.get());
+    }
+  }
+
+  private static final MetricAggregation<Long, Long> COUNTER =
+      new MetricAggregation<Long, Long>() {
+    @Override
+    public Long zero() {
+      return 0L;
+    }
+
+    @Override
+    public Long combine(Iterable<Long> updates) {
+      long value = 0;
+      for (long update : updates) {
+        value += update;
+      }
+      return value;
+    }
+
+    @Override
+    public Long extract(Long data) {
+      return data;
+    }
+  };
+
+  private static final MetricAggregation<DistributionData, DistributionResult> 
DISTRIBUTION =
+      new MetricAggregation<DistributionData, DistributionResult>() {
+        @Override
+        public DistributionData zero() {
+          return DistributionData.EMPTY;
+        }
+
+        @Override
+        public DistributionData combine(Iterable<DistributionData> updates) {
+          DistributionData result = DistributionData.EMPTY;
+          for (DistributionData update : updates) {
+            result = result.combine(update);
+          }
+          return result;
+        }
+
+        @Override
+        public DistributionResult extract(DistributionData data) {
+          return data.extractResult();
+        }
+      };
+
+  /** The current values of counters in memory. */
+  private MetricsMap<MetricKey, DirectMetric<Long, Long>> counters =
+      new MetricsMap<>(new MetricsMap.Factory<MetricKey, DirectMetric<Long, 
Long>>() {
+        @Override
+        public DirectMetric<Long, Long> createInstance(MetricKey unusedKey) {
+          return new DirectMetric<>(COUNTER);
+        }
+      });
+  private MetricsMap<MetricKey, DirectMetric<DistributionData, 
DistributionResult>> distributions =
+      new MetricsMap<>(
+          new MetricsMap.Factory<MetricKey, DirectMetric<DistributionData, 
DistributionResult>>() {
+        @Override
+        public DirectMetric<DistributionData, DistributionResult> 
createInstance(
+            MetricKey unusedKey) {
+          return new DirectMetric<>(DISTRIBUTION);
+        }
+      });
+
+  @AutoValue
+  abstract static class DirectMetricQueryResults implements MetricQueryResults 
{
+    public static MetricQueryResults create(
+        Iterable<MetricResult<Long>> counters,
+        Iterable<MetricResult<DistributionResult>> distributions) {
+      return new AutoValue_DirectMetrics_DirectMetricQueryResults(counters, 
distributions);
+    }
+  }
+
+  @AutoValue
+  abstract static class DirectMetricResult<T> implements MetricResult<T> {
+    public static <T> MetricResult<T> create(MetricName name, String scope,
+        T committed, T attempted) {
+      return new AutoValue_DirectMetrics_DirectMetricResult<T>(
+          name, scope, committed, attempted);
+    }
+  }
+
+  @Override
+  public MetricQueryResults queryMetrics(MetricsFilter filter) {
+    ImmutableList.Builder<MetricResult<Long>> counterResults = 
ImmutableList.builder();
+    for (Entry<MetricKey, DirectMetric<Long, Long>> counter : 
counters.entries()) {
+      maybeExtractResult(filter, counterResults, counter);
+    }
+    ImmutableList.Builder<MetricResult<DistributionResult>> 
distributionResults =
+        ImmutableList.builder();
+    for (Entry<MetricKey, DirectMetric<DistributionData, DistributionResult>> 
distribution
+        : distributions.entries()) {
+      maybeExtractResult(filter, distributionResults, distribution);
+    }
+
+    return DirectMetricQueryResults.create(counterResults.build(), 
distributionResults.build());
+  }
+
+  private <ResultT> void maybeExtractResult(
+      MetricsFilter filter,
+      ImmutableList.Builder<MetricResult<ResultT>> resultsBuilder,
+      Map.Entry<MetricKey, ? extends DirectMetric<?, ResultT>> entry) {
+    if (matches(filter, entry.getKey())) {
+      resultsBuilder.add(DirectMetricResult.create(
+          entry.getKey().metricName(),
+          entry.getKey().stepName(),
+          entry.getValue().extractCommitted(),
+          entry.getValue().extractLatestAttempted()));
+    }
+  }
+
+  // Matching logic is implemented here rather than in MetricsFilter because 
we would like
+  // MetricsFilter to act as a "dumb" value-object, with the possibility of 
replacing it with
+  // a Proto/JSON/etc. schema object.
+  private boolean matches(MetricsFilter filter, MetricKey key) {
+    return matchesName(key.metricName(), filter.names())
+        && matchesScope(key.stepName(), filter.steps());
+  }
+
+  private boolean matchesScope(String actualScope, Set<String> scopes) {
+    if (scopes.isEmpty() || scopes.contains(actualScope)) {
+      return true;
+    }
+
+    for (String scope : scopes) {
+      if (actualScope.startsWith(scope)) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  private boolean matchesName(MetricName metricName, Set<MetricNameFilter> 
nameFilters) {
+    if (nameFilters.isEmpty()) {
+      return true;
+    }
+
+    for (MetricNameFilter nameFilter : nameFilters) {
+      if ((nameFilter.getName() == null || 
nameFilter.getName().equals(metricName.name()))
+          && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) 
{
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /** Apply metric updates that represent physical counter deltas to the 
current metric values. */
+  public void updatePhysical(CommittedBundle<?> bundle, MetricUpdates updates) 
{
+    for (MetricUpdate<Long> counter : updates.counterUpdates()) {
+      counters.get(counter.getKey()).updatePhysical(bundle, 
counter.getUpdate());
+    }
+    for (MetricUpdate<DistributionData> distribution : 
updates.distributionUpdates()) {
+      distributions.get(distribution.getKey())
+          .updatePhysical(bundle, distribution.getUpdate());
+    }
+  }
+
+  public void commitPhysical(CommittedBundle<?> bundle, MetricUpdates updates) 
{
+    for (MetricUpdate<Long> counter : updates.counterUpdates()) {
+      counters.get(counter.getKey()).commitPhysical(bundle, 
counter.getUpdate());
+    }
+    for (MetricUpdate<DistributionData> distribution : 
updates.distributionUpdates()) {
+      distributions.get(distribution.getKey())
+          .commitPhysical(bundle, distribution.getUpdate());
+    }
+  }
+
+  /** Apply metric updates that represent new logical values from a bundle 
being committed. */
+  public void commitLogical(CommittedBundle<?> bundle, MetricUpdates updates) {
+    for (MetricUpdate<Long> counter : updates.counterUpdates()) {
+      counters.get(counter.getKey()).commitLogical(bundle, 
counter.getUpdate());
+    }
+    for (MetricUpdate<DistributionData> distribution : 
updates.distributionUpdates()) {
+      distributions.get(distribution.getKey())
+          .commitLogical(bundle, distribution.getUpdate());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index e13046d..8941093 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
@@ -226,6 +227,7 @@ public class DirectRunner
 
   @Override
   public DirectPipelineResult run(Pipeline pipeline) {
+    MetricsEnvironment.setMetricsSupported(true);
     ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new 
ConsumerTrackingPipelineVisitor();
     pipeline.traverseTopologically(consumerTrackingVisitor);
     for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) 
{
@@ -268,8 +270,7 @@ public class DirectRunner
 
     Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
         pipeline.getAggregatorSteps();
-    DirectPipelineResult result =
-        new DirectPipelineResult(executor, context, aggregatorSteps);
+    DirectPipelineResult result = new DirectPipelineResult(executor, context, 
aggregatorSteps);
     if (options.isBlockOnRun()) {
       try {
         result.awaitCompletion();
@@ -383,8 +384,7 @@ public class DirectRunner
 
     @Override
     public MetricResults metrics() {
-      throw new UnsupportedOperationException(
-          "The DirectRunner does not currently support metrics.");
+      return evaluationContext.getMetrics();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 2901254..e5a30d4 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -96,6 +96,8 @@ class EvaluationContext {
 
   private final AggregatorContainer mergedAggregators;
 
+  private final DirectMetrics metrics;
+
   public static EvaluationContext create(
       DirectOptions options,
       Clock clock,
@@ -130,6 +132,7 @@ class EvaluationContext {
 
     this.applicationStateInternals = new ConcurrentHashMap<>();
     this.mergedAggregators = AggregatorContainer.create();
+    this.metrics = new DirectMetrics();
 
     this.callbackExecutor =
         WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
@@ -161,6 +164,8 @@ class EvaluationContext {
       TransformResult result) {
     Iterable<? extends CommittedBundle<?>> committedBundles =
         commitBundles(result.getOutputBundles());
+    metrics.commitLogical(completedBundle, result.getLogicalMetricUpdates());
+
     // Update watermarks and timers
     EnumSet<OutputType> outputTypes = EnumSet.copyOf(result.getOutputTypes());
     if (Iterables.isEmpty(committedBundles)) {
@@ -367,6 +372,11 @@ class EvaluationContext {
     return mergedAggregators;
   }
 
+  /** Returns the metrics container for this pipeline. */
+  public DirectMetrics getMetrics() {
+    return metrics;
+  }
+
   @VisibleForTesting
   void forceRefresh() {
     watermarkManager.refreshAll();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index fab6a33..3761574 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -212,6 +212,7 @@ final class ExecutorServiceParallelExecutor implements 
PipelineExecutor {
 
     TransformExecutor<T> callable =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             enforcements,
             bundle,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
index 4972340..db92542 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutableListBundleFactory.java
@@ -123,5 +123,15 @@ class ImmutableListBundleFactory implements BundleFactory {
           ImmutableList.copyOf(elements),
           getSynchronizedProcessingOutputWatermark());
     }
+
+    @Override
+    public int hashCode() {
+      return System.identityHashCode(this);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this == obj;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index 1829e4a..989109f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -22,10 +22,10 @@ import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -37,31 +37,6 @@ import org.joda.time.Instant;
  */
 @AutoValue
 public abstract class StepTransformResult implements TransformResult {
-  @Override
-  public abstract AppliedPTransform<?, ?, ?> getTransform();
-
-  @Override
-  public abstract Iterable<? extends UncommittedBundle<?>> getOutputBundles();
-
-  @Override
-  public abstract Iterable<? extends WindowedValue<?>> 
getUnprocessedElements();
-
-  @Override
-  @Nullable
-  public abstract AggregatorContainer.Mutator getAggregatorChanges();
-
-  @Override
-  public abstract Instant getWatermarkHold();
-
-  @Nullable
-  @Override
-  public abstract CopyOnAccessInMemoryStateInternals<?> getState();
-
-  @Override
-  public abstract TimerUpdate getTimerUpdate();
-
-  @Override
-  public abstract Set<OutputType> getOutputTypes();
 
   public static Builder withHold(AppliedPTransform<?, ?, ?> transform, Instant 
watermarkHold) {
     return new Builder(transform, watermarkHold);
@@ -71,6 +46,20 @@ public abstract class StepTransformResult implements 
TransformResult {
     return new Builder(transform, BoundedWindow.TIMESTAMP_MAX_VALUE);
   }
 
+  @Override
+  public TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates) 
{
+    return new AutoValue_StepTransformResult(
+        getTransform(),
+        getOutputBundles(),
+        getUnprocessedElements(),
+        getAggregatorChanges(),
+        metricUpdates,
+        getWatermarkHold(),
+        getState(),
+        getTimerUpdate(),
+        getOutputTypes());
+  }
+
   /**
    * A builder for creating instances of {@link StepTransformResult}.
    */
@@ -78,6 +67,7 @@ public abstract class StepTransformResult implements 
TransformResult {
     private final AppliedPTransform<?, ?, ?> transform;
     private final ImmutableList.Builder<UncommittedBundle<?>> bundlesBuilder;
     private final ImmutableList.Builder<WindowedValue<?>> 
unprocessedElementsBuilder;
+    private MetricUpdates metricUpdates;
     private CopyOnAccessInMemoryStateInternals<?> state;
     private TimerUpdate timerUpdate;
     private AggregatorContainer.Mutator aggregatorChanges;
@@ -91,6 +81,7 @@ public abstract class StepTransformResult implements 
TransformResult {
       this.producedOutputs = EnumSet.noneOf(OutputType.class);
       this.unprocessedElementsBuilder = ImmutableList.builder();
       this.timerUpdate = TimerUpdate.builder(null).build();
+      this.metricUpdates = MetricUpdates.EMPTY;
     }
 
     public StepTransformResult build() {
@@ -99,6 +90,7 @@ public abstract class StepTransformResult implements 
TransformResult {
           bundlesBuilder.build(),
           unprocessedElementsBuilder.build(),
           aggregatorChanges,
+          metricUpdates,
           watermarkHold,
           state,
           timerUpdate,
@@ -110,6 +102,11 @@ public abstract class StepTransformResult implements 
TransformResult {
       return this;
     }
 
+    public Builder withMetricUpdates(MetricUpdates metricUpdates) {
+      this.metricUpdates = metricUpdates;
+      return this;
+    }
+
     public Builder withState(CopyOnAccessInMemoryStateInternals<?> state) {
       this.state = state;
       return this;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index aaee9a5..03f615b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -25,6 +25,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 
@@ -38,6 +41,7 @@ import org.apache.beam.sdk.util.WindowedValue;
  */
 class TransformExecutor<T> implements Runnable {
   public static <T> TransformExecutor<T> create(
+      EvaluationContext context,
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
       CommittedBundle<T> inputBundle,
@@ -45,6 +49,7 @@ class TransformExecutor<T> implements Runnable {
       CompletionCallback completionCallback,
       TransformExecutorService transformEvaluationState) {
     return new TransformExecutor<>(
+        context,
         factory,
         modelEnforcements,
         inputBundle,
@@ -63,10 +68,12 @@ class TransformExecutor<T> implements Runnable {
 
   private final CompletionCallback onComplete;
   private final TransformExecutorService transformEvaluationState;
+  private final EvaluationContext context;
 
   private final AtomicReference<Thread> thread;
 
   private TransformExecutor(
+      EvaluationContext context,
       TransformEvaluatorFactory factory,
       Iterable<? extends ModelEnforcementFactory> modelEnforcements,
       CommittedBundle<T> inputBundle,
@@ -82,11 +89,14 @@ class TransformExecutor<T> implements Runnable {
     this.onComplete = completionCallback;
 
     this.transformEvaluationState = transformEvaluationState;
+    this.context = context;
     this.thread = new AtomicReference<>();
   }
 
   @Override
   public void run() {
+    MetricsContainer metricsContainer = new 
MetricsContainer(transform.getFullName());
+    MetricsEnvironment.setMetricsContainer(metricsContainer);
     checkState(
         thread.compareAndSet(null, Thread.currentThread()),
         "Tried to execute %s for %s on thread %s, but is already executing on 
thread %s",
@@ -108,9 +118,9 @@ class TransformExecutor<T> implements Runnable {
         return;
       }
 
-      processElements(evaluator, enforcements);
+      processElements(evaluator, metricsContainer, enforcements);
 
-      finishBundle(evaluator, enforcements);
+      finishBundle(evaluator, metricsContainer, enforcements);
     } catch (Throwable t) {
       onComplete.handleThrowable(inputBundle, t);
       if (t instanceof RuntimeException) {
@@ -118,6 +128,10 @@ class TransformExecutor<T> implements Runnable {
       }
       throw new RuntimeException(t);
     } finally {
+      // Report the physical metrics from the end of this step.
+      context.getMetrics().commitPhysical(inputBundle, 
metricsContainer.getCumulative());
+
+      MetricsEnvironment.unsetMetricsContainer();
       transformEvaluationState.complete(this);
     }
   }
@@ -127,7 +141,9 @@ class TransformExecutor<T> implements Runnable {
    * necessary {@link ModelEnforcement ModelEnforcements}.
    */
   private void processElements(
-      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> 
enforcements)
+      TransformEvaluator<T> evaluator,
+      MetricsContainer metricsContainer,
+      Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
     if (inputBundle != null) {
       for (WindowedValue<T> value : inputBundle.getElements()) {
@@ -137,6 +153,13 @@ class TransformExecutor<T> implements Runnable {
 
         evaluator.processElement(value);
 
+        // Report the physical metrics after each element
+        MetricUpdates deltas = metricsContainer.getUpdates();
+        if (deltas != null) {
+          context.getMetrics().updatePhysical(inputBundle, deltas);
+          metricsContainer.commitUpdates();
+        }
+
         for (ModelEnforcement<T> enforcement : enforcements) {
           enforcement.afterElement(value);
         }
@@ -152,9 +175,11 @@ class TransformExecutor<T> implements Runnable {
    *         {@link TransformEvaluator#finishBundle()}
    */
   private TransformResult finishBundle(
-      TransformEvaluator<T> evaluator, Collection<ModelEnforcement<T>> 
enforcements)
+      TransformEvaluator<T> evaluator, MetricsContainer metricsContainer,
+      Collection<ModelEnforcement<T>> enforcements)
       throws Exception {
-    TransformResult result = evaluator.finishBundle();
+    TransformResult result = evaluator.finishBundle()
+        .withLogicalMetricUpdates(metricsContainer.getCumulative());
     CommittedResult outputs = onComplete.handleResult(inputBundle, result);
     for (ModelEnforcement<T> enforcement : enforcements) {
       enforcement.afterFinish(inputBundle, result, outputs.getOutputs());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
index ba2d48e..ac1e395 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformResult.java
@@ -22,6 +22,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
+import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -57,6 +58,11 @@ public interface TransformResult {
   @Nullable AggregatorContainer.Mutator getAggregatorChanges();
 
   /**
+   * Returns the logical metric updates.
+   */
+  MetricUpdates getLogicalMetricUpdates();
+
+  /**
    * Returns the Watermark Hold for the transform at the time this result was 
produced.
    *
    * <p>If the transform does not set any watermark hold, returns
@@ -86,4 +92,10 @@ public interface TransformResult {
    * {@link OutputType#BUNDLE}, as empty bundles may be dropped when the 
transform is committed.
    */
   Set<OutputType> getOutputTypes();
+
+  /**
+   * Returns a new TransformResult based on this one but overwriting any 
existing logical metric
+   * updates with {@code metricUpdates}.
+   */
+  TransformResult withLogicalMetricUpdates(MetricUpdates metricUpdates);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
new file mode 100644
index 0000000..df01244
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
+import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.sdk.metrics.DistributionData;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricUpdates;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link DirectMetrics}.
+ */
+@RunWith(JUnit4.class)
+public class DirectMetricsTest {
+
+  @Mock
+  private CommittedBundle<Object> bundle1;
+  @Mock
+  private CommittedBundle<Object> bundle2;
+
+  private static final MetricName NAME1 = MetricName.named("ns1", "name1");
+  private static final MetricName NAME2 = MetricName.named("ns1", "name2");
+  private static final MetricName NAME3 = MetricName.named("ns2", "name1");
+
+  private DirectMetrics metrics = new DirectMetrics();
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testApplyLogicalQueryNoFilter() {
+    metrics.commitLogical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("step1", NAME2), 8L)),
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1),
+                DistributionData.create(8, 2, 3, 5)))));
+    metrics.commitLogical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
+            MetricUpdate.create(MetricKey.create("step1", NAME2), 4L)),
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1),
+                DistributionData.create(4, 1, 4, 4)))));
+
+    MetricQueryResults results = 
metrics.queryMetrics(MetricsFilter.builder().build());
+    assertThat(results.counters(), containsInAnyOrder(
+        metricResult("ns1", "name1", "step1", 5L, 0L),
+        metricResult("ns1", "name2", "step1", 12L, 0L),
+        metricResult("ns1", "name1", "step2", 7L, 0L)));
+    assertThat(results.distributions(), contains(
+        metricResult("ns1", "name1", "step1",
+            DistributionResult.create(12, 3, 3, 5),
+            DistributionResult.ZERO)));
+  }
+
+  @Test
+  public void testApplyPhysicalCountersQueryOneNamespace() {
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("step1", NAME3), 8L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("step2", NAME1), 7L),
+            MetricUpdate.create(MetricKey.create("step1", NAME3), 4L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+
+    assertThat(metrics.queryMetrics(
+        
MetricsFilter.builder().addNameFilter(inNamespace("ns1")).build()).counters(),
+        containsInAnyOrder(
+            metricResult("ns1", "name1", "step1", 0L, 5L),
+            metricResult("ns1", "name1", "step2", 0L, 7L)));
+  }
+
+  @Test
+  public void testApplyPhysicalQueryCompositeScope() {
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 5L),
+            MetricUpdate.create(MetricKey.create("Outer1/Inner2", NAME1), 8L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+    metrics.updatePhysical(bundle1, MetricUpdates.create(
+        ImmutableList.of(
+            MetricUpdate.create(MetricKey.create("Outer1/Inner1", NAME1), 12L),
+            MetricUpdate.create(MetricKey.create("Outer2/Inner2", NAME1), 
18L)),
+        ImmutableList.<MetricUpdate<DistributionData>>of()));
+
+    assertThat(metrics.queryMetrics(
+        MetricsFilter.builder().addStep("Outer1").build()).counters(),
+        containsInAnyOrder(
+            metricResult("ns1", "name1", "Outer1/Inner1", 0L, 12L),
+            metricResult("ns1", "name1", "Outer1/Inner2", 0L, 8L)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 4768fb0..d93dd7a 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
@@ -35,12 +37,20 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Distribution;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
@@ -444,4 +454,30 @@ public class DirectRunnerTest implements Serializable {
       throw new CoderException("Cannot decode a long");
     }
   }
+
+  public void testMetrics() throws Exception {
+    Pipeline pipeline = getPipeline();
+    pipeline
+        .apply(Create.of(5, 8, 13))
+        .apply("MyStep", ParDo.of(new DoFn<Integer, Void>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            Counter count = Metrics.counter(DirectRunnerTest.class, "count");
+            Distribution values = Metrics.distribution(DirectRunnerTest.class, 
"input");
+
+            count.inc();
+            values.update(c.element());
+          }
+        }));
+    PipelineResult result = pipeline.run();
+    MetricQueryResults metrics = 
result.metrics().queryMetrics(MetricsFilter.builder()
+        .addNameFilter(MetricNameFilter.inNamespace(DirectRunnerTest.class))
+        .build());
+    assertThat(metrics.counters(), contains(
+        metricResult(DirectRunnerTest.class.getName(), "count", "MyStep", 3L, 
3L)));
+    assertThat(metrics.distributions(), contains(
+        metricResult(DirectRunnerTest.class.getName(), "input", "MyStep",
+            DistributionResult.create(26L, 3L, 5L, 13L),
+            DistributionResult.create(26L, 3L, 5L, 13L))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/834933c5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index c63e9bd..5015e5a 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -72,6 +72,7 @@ public class TransformExecutorTest {
   private RegisteringCompletionCallback completionCallback;
   private TransformExecutorService transformEvaluationState;
   private BundleFactory bundleFactory;
+  @Mock private DirectMetrics metrics;
   @Mock private EvaluationContext evaluationContext;
   @Mock private TransformEvaluatorRegistry registry;
 
@@ -90,6 +91,8 @@ public class TransformExecutorTest {
     TestPipeline p = TestPipeline.create();
     created = p.apply(Create.of("foo", "spam", "third"));
     downstream = created.apply(WithKeys.<Integer, String>of(3));
+
+    when(evaluationContext.getMetrics()).thenReturn(metrics);
   }
 
   @Test
@@ -116,6 +119,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<Object> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
@@ -135,6 +139,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<Object> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
@@ -177,6 +182,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
@@ -219,6 +225,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
@@ -254,6 +261,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             inputBundle,
@@ -294,6 +302,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>emptyList(),
             null,
@@ -335,6 +344,7 @@ public class TransformExecutorTest {
     TestEnforcementFactory enforcement = new TestEnforcementFactory();
     TransformExecutor<String> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             Collections.<ModelEnforcementFactory>singleton(enforcement),
             inputBundle,
@@ -392,6 +402,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<byte[]> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             
Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
             inputBundle,
@@ -448,6 +459,7 @@ public class TransformExecutorTest {
 
     TransformExecutor<byte[]> executor =
         TransformExecutor.create(
+            evaluationContext,
             registry,
             
Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
             inputBundle,


Reply via email to