Initial Metrics API for Beam Java

This includes a simple Counter metric and a Distribution metric that
reports the SUM, COUNT, MIN, MAX and MEAN of the reported values.

The API is labeled @Experimental since metrics will only be reported
and queryable with the DirectRunner, and the API may change as it is
implemented on other runners.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8524ed95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8524ed95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8524ed95

Branch: refs/heads/master
Commit: 8524ed9545f5af4bdeb54601f333549b34eb35aa
Parents: e969f3d
Author: bchambers <bchamb...@google.com>
Authored: Wed Oct 12 10:29:50 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Thu Oct 13 15:29:29 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/annotations/Experimental.java      |   3 +
 .../org/apache/beam/sdk/metrics/Counter.java    |  40 +++++
 .../apache/beam/sdk/metrics/CounterCell.java    |  76 ++++++++++
 .../org/apache/beam/sdk/metrics/DirtyState.java |  98 ++++++++++++
 .../apache/beam/sdk/metrics/Distribution.java   |  30 ++++
 .../beam/sdk/metrics/DistributionCell.java      |  58 +++++++
 .../beam/sdk/metrics/DistributionData.java      |  59 ++++++++
 .../beam/sdk/metrics/DistributionResult.java    |  42 ++++++
 .../org/apache/beam/sdk/metrics/Metric.java     |  24 +++
 .../org/apache/beam/sdk/metrics/MetricCell.java |  47 ++++++
 .../org/apache/beam/sdk/metrics/MetricKey.java  |  40 +++++
 .../org/apache/beam/sdk/metrics/MetricName.java |  46 ++++++
 .../beam/sdk/metrics/MetricNameFilter.java      |  60 ++++++++
 .../beam/sdk/metrics/MetricQueryResults.java    |  33 ++++
 .../apache/beam/sdk/metrics/MetricResult.java   |  45 ++++++
 .../apache/beam/sdk/metrics/MetricResults.java  |  34 +++++
 .../apache/beam/sdk/metrics/MetricUpdates.java  |  72 +++++++++
 .../org/apache/beam/sdk/metrics/Metrics.java    | 110 ++++++++++++++
 .../beam/sdk/metrics/MetricsContainer.java      | 150 +++++++++++++++++++
 .../beam/sdk/metrics/MetricsEnvironment.java    |  85 +++++++++++
 .../apache/beam/sdk/metrics/MetricsFilter.java  |  86 +++++++++++
 .../org/apache/beam/sdk/metrics/MetricsMap.java |  86 +++++++++++
 .../apache/beam/sdk/metrics/package-info.java   |  28 ++++
 .../beam/sdk/metrics/CounterCellTest.java       |  55 +++++++
 .../apache/beam/sdk/metrics/DirtyStateTest.java |  56 +++++++
 .../beam/sdk/metrics/DistributionCellTest.java  |  53 +++++++
 .../apache/beam/sdk/metrics/MetricMatchers.java |  99 ++++++++++++
 .../beam/sdk/metrics/MetricsContainerTest.java  | 129 ++++++++++++++++
 .../sdk/metrics/MetricsEnvironmentTest.java     |  63 ++++++++
 .../apache/beam/sdk/metrics/MetricsMapTest.java | 103 +++++++++++++
 .../apache/beam/sdk/metrics/MetricsTest.java    |  98 ++++++++++++
 31 files changed, 2008 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 789f4b2..14d2358 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -83,5 +83,8 @@ public @interface Experimental {
      * Do not use: API is unstable and runner support is incomplete.
      */
     SPLITTABLE_DO_FN,
+
+    /** Metrics-related experimental APIs. */
+    METRICS
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
new file mode 100644
index 0000000..9f48016
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A metric that reports a single long value and can be incremented or 
decremented.
+ */
+@Experimental(Kind.METRICS)
+public interface Counter extends Metric {
+
+  /** Increment the counter. */
+  void inc();
+
+  /** Increment the counter by the given amount. */
+  void inc(long n);
+
+  /* Decrement the counter. */
+  void dec();
+
+  /* Decrement the counter by the given amount. */
+  void dec(long n);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
new file mode 100644
index 0000000..bb65833
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Tracks the current value (and delta) for a Counter metric for a specific 
context and bundle.
+ *
+ * <p>This class generally shouldn't be used directly. The only exception is 
within a runner where
+ * a counter is being reported for a specific step (rather than the counter in 
the current context).
+ */
+@Experimental(Kind.METRICS)
+class CounterCell implements MetricCell<Counter, Long>, Counter {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicLong value = new AtomicLong();
+
+  /** Increment the counter by the given amount. */
+  private void add(long n) {
+    value.addAndGet(n);
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public Long getCumulative() {
+    return value.get();
+  }
+
+  @Override
+  public Counter getInterface() {
+    return this;
+  }
+
+  @Override
+  public void inc() {
+    add(1);
+  }
+
+  @Override
+  public void inc(long n) {
+    add(n);
+  }
+
+  @Override
+  public void dec() {
+    add(-1);
+  }
+
+  @Override
+  public void dec(long n) {
+    add(-n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
new file mode 100644
index 0000000..6706be8
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Atomically tracks the dirty-state of a metric.
+ *
+ * <p>Reporting an update is split into two parts such that only changes made 
before the call to
+ * {@link #beforeCommit()} are committed when {@link #afterCommit()} is 
invoked. This allows for
+ * a two-step commit process of gathering all the dirty updates (calling 
{#link beforeCommit()})
+ * followed by committing and calling {#link afterCommit()}.
+ *
+ * <p>The tracking of dirty states is done conservatively -- sometimes {@link 
#beforeCommit()}
+ * will return true (indicating a dirty metric) even if there have been no 
changes since the last
+ * commit.
+ *
+ * <p>There is also a possible race when the underlying metric is modified but 
the call to
+ * {@link #afterModification()} hasn't happened before the call to {@link 
#beforeCommit()}. In this
+ * case the next round of metric updating will see the changes. If this was 
for the final commit,
+ * then the metric updates shouldn't be extracted until all possible user 
modifications have
+ * completed.
+ */
+@Experimental(Kind.METRICS)
+class DirtyState {
+  private enum State {
+    /** Indicates that there have been changes to the MetricCell since last 
commit. */
+    DIRTY,
+    /** Indicates that there have been no changes to the MetricCell since last 
commit. */
+    CLEAN,
+    /** Indicates that a commit of the current value is in progress. */
+    COMMITTING
+  }
+
+  private final AtomicReference<State> dirty = new 
AtomicReference<>(State.DIRTY);
+
+  /**
+   * Indicate that changes have been made to the metric being tracked by this 
{@link DirtyState}.
+   *
+   * <p>Should be called <b>after</b> modification of the value.
+   */
+  public void afterModification() {
+    dirty.set(State.DIRTY);
+  }
+
+  /**
+   * Check the dirty state and mark the metric as committing.
+   *
+   * <p>If the state was {@code CLEAN}, this returns {@code false}. If the 
state was {@code DIRTY}
+   * or {@code COMMITTING} this returns {@code true} and sets the state to 
{@code COMMITTING}.
+   *
+   * @return {@code false} if the state is clean and {@code true} otherwise.
+   */
+  public boolean beforeCommit() {
+    // After this loop, we want the state to be either CLEAN or COMMITTING.
+    // If the state was CLEAN, we don't need to do anything (and exit the loop 
early)
+    // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). 
This will only
+    // fail if another thread is getting updates which generally shouldn't be 
the case.
+    // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, 
COMMITTING). This will
+    // fail if another thread commits updates (which shouldn't be the case) or 
if the user code
+    // updates the metric, in which case it will transition to DIRTY and the 
next iteration will
+    // successfully update it.
+    State state;
+    do {
+      state = dirty.get();
+    } while (state != State.CLEAN && !dirty.compareAndSet(state, 
State.COMMITTING));
+
+    return state != State.CLEAN;
+  }
+
+  /**
+   * Mark any changes up to the most recently call to {@link #beforeCommit()}} 
as committed.
+   * The next call to {@link #beforeCommit()} will return {@code false} unless 
there have
+   * been changes made since the previous call to {@link #beforeCommit()}.
+   */
+  public void afterCommit() {
+    dirty.compareAndSet(State.COMMITTING, State.CLEAN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
new file mode 100644
index 0000000..b789020
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A metric that reports information about the distribution of reported values.
+ */
+@Experimental(Kind.METRICS)
+public interface Distribution extends Metric {
+  /** Add an observation to this distribution. */
+  void update(long value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
new file mode 100644
index 0000000..f0074a9
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Tracks the current value (and delta) for a Distribution metric.
+ */
+@Experimental(Kind.METRICS)
+class DistributionCell implements MetricCell<Distribution, DistributionData>, 
Distribution {
+
+  private final DirtyState dirty = new DirtyState();
+  private final AtomicReference<DistributionData> value =
+      new AtomicReference<DistributionData>(DistributionData.EMPTY);
+
+  /** Increment the counter by the given amount. */
+  @Override
+  public void update(long n) {
+    DistributionData original;
+    do {
+      original = value.get();
+    } while (!value.compareAndSet(original, 
original.combine(DistributionData.singleton(n))));
+    dirty.afterModification();
+  }
+
+  @Override
+  public DirtyState getDirty() {
+    return dirty;
+  }
+
+  @Override
+  public DistributionData getCumulative() {
+    return value.get();
+  }
+
+  @Override
+  public Distribution getInterface() {
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
new file mode 100644
index 0000000..59c7fbd
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * Data describing the the distribution. This should retain enough detail that 
it can be combined
+ * with other {@link DistributionData}.
+ *
+ * <p>This is kept distinct from {@link DistributionResult} since this may be 
extended to include
+ * data necessary to approximate quantiles, etc. while {@link 
DistributionResult} would just include
+ * the approximate value of those quantiles.
+ */
+@AutoValue
+public abstract class DistributionData {
+
+  public abstract long sum();
+  public abstract long count();
+  public abstract long min();
+  public abstract long max();
+
+  public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  public static DistributionData create(long sum, long count, long min, long 
max) {
+    return new AutoValue_DistributionData(sum, count, min, max);
+  }
+
+  public static DistributionData singleton(long value) {
+    return create(value, 1, value, value);
+  }
+
+  public DistributionData combine(DistributionData value) {
+    return create(
+        sum() + value.sum(),
+        count() + value.count(),
+        Math.min(value.min(), min()),
+        Math.max(value.max(), max()));
+  }
+
+  public DistributionResult extractResult() {
+    return DistributionResult.create(sum(), count(), min(), max());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
new file mode 100644
index 0000000..27c242c
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+
+/**
+ * The result of a {@link Distribution} metric.
+ */
+@AutoValue
+public abstract class DistributionResult {
+
+  public abstract long sum();
+  public abstract long count();
+  public abstract long min();
+  public abstract long max();
+
+  public double mean() {
+    return (1.0 * sum()) / count();
+  }
+
+  public static final DistributionResult ZERO = create(0, 0, Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  public static DistributionResult create(long sum, long count, long min, long 
max) {
+    return new AutoValue_DistributionResult(sum, count, min, max);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
new file mode 100644
index 0000000..37a5f65
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+/**
+ * Marker interface for all user-facing metrics.
+ */
+public interface Metric { }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
new file mode 100644
index 0000000..211b2dd
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A {@link MetricCell} is used for accumulating in-memory changes to a 
metric. It represents a
+ * specific metric name in a single context.
+ *
+ * @param <UserT> The type of the user interface for reporting changes to this 
cell.
+ * @param <DataT> The type of metric data stored (and extracted) from this 
cell.
+ */
+@Experimental(Kind.METRICS)
+interface MetricCell<UserT extends Metric, DataT> {
+
+  /**
+   * Return the {@link DirtyState} tracking whether this metric cell contains 
uncommitted changes.
+   */
+  DirtyState getDirty();
+
+  /**
+   * Return the cumulative value of this metric.
+   */
+  DataT getCumulative();
+
+  /**
+   * Return the user-facing mutator for this cell.
+   */
+  UserT getInterface();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
new file mode 100644
index 0000000..bfa4df5
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Metrics are keyed by the step name they are associated with and the name of 
the metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricKey {
+
+  /** The step name that is associated with this metric. */
+  public abstract String stepName();
+
+  /** The name of the metric. */
+  public abstract MetricName metricName();
+
+  public static MetricKey create(String stepName, MetricName metricName) {
+    return new AutoValue_MetricKey(stepName, metricName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
new file mode 100644
index 0000000..843a885
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The name of a metric consists of a {@link #namespace} and a {@link #name}. 
The {@link #namespace}
+ * allows grouping related metrics together and also prevents collisions 
between multiple metrics
+ * with the same name.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricName {
+
+  /** The namespace associated with this metric. */
+  public abstract String namespace();
+
+  /** The name of this metric. */
+  public abstract String name();
+
+  public static MetricName named(String namespace, String name) {
+    return new AutoValue_MetricName(namespace, name);
+  }
+
+  public static MetricName named(Class<?> namespace, String name) {
+    return new AutoValue_MetricName(namespace.getName(), name);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
new file mode 100644
index 0000000..a2c3798
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The name of a metric.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricNameFilter {
+
+  /** The inNamespace that a metric must be in to match this {@link 
MetricNameFilter}. */
+  public abstract String getNamespace();
+
+  /** If set, the metric must have this name to match this {@link 
MetricNameFilter}. */
+  @Nullable
+  public abstract String getName();
+
+  public static MetricNameFilter inNamespace(String namespace) {
+    return new AutoValue_MetricNameFilter(namespace, null);
+  }
+
+  public static MetricNameFilter inNamespace(Class<?> namespace) {
+    return new AutoValue_MetricNameFilter(namespace.getName(), null);
+  }
+
+  public static MetricNameFilter named(String namespace, String name) {
+    checkNotNull(name, "Must specify a name");
+    return new AutoValue_MetricNameFilter(namespace, name);
+  }
+
+  public static MetricNameFilter named(Class<?> namespace, String name) {
+    checkNotNull(namespace, "Must specify a inNamespace");
+    checkNotNull(name, "Must specify a name");
+    return new AutoValue_MetricNameFilter(namespace.getSimpleName(), name);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
new file mode 100644
index 0000000..2241ba8
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The results of a query for metrics. Allows accessing all of the metrics 
that matched the filter.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricQueryResults {
+  /** Return the metric results for the counters that matched the filter. */
+  Iterable<MetricResult<Long>> counters();
+
+  /** Return the metric results for the distributions that matched the filter. 
*/
+  Iterable<MetricResult<DistributionResult>> distributions();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
new file mode 100644
index 0000000..9a3971a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The results of a single current metric.
+ */
+@Experimental(Kind.METRICS)
+public interface MetricResult<T> {
+  /** Return the name of the metric. */
+  MetricName name();
+  /** Return the step context to which this metric result applies. */
+  String step();
+
+  /**
+   * Return the value of this metric across all successfully completed parts 
of the pipeline.
+   *
+   * <p>Not all runners will support committed metrics. If they are not 
supported, the runner will
+   * throw an {@link UnsupportedOperationException}.
+   */
+  T committed();
+
+  /**
+   * Return the value of this metric across all attempts of executing all 
parts of the pipeline.
+   */
+  T attempted();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
new file mode 100644
index 0000000..dab65ea
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Methods for interacting with the metrics of a pipeline that has been 
executed. Accessed via
+ * {@link PipelineResult#metrics()}.
+ */
+@Experimental(Kind.METRICS)
+public abstract class MetricResults {
+  /**
+   * Query for all metrics that match the filter.
+   */
+  public abstract MetricQueryResults queryMetrics(MetricsFilter filter);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
new file mode 100644
index 0000000..e84dc66
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.Iterables;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Representation of multiple metric updates.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricUpdates {
+
+  public static final MetricUpdates EMPTY = MetricUpdates.create(
+      Collections.<MetricUpdate<Long>>emptyList(),
+      Collections.<MetricUpdate<DistributionData>>emptyList());
+
+  /**
+   * Representation of a single metric update.
+   * @param <T> The type of value representing the update.
+   */
+  @AutoValue
+  public abstract static class MetricUpdate<T> {
+
+    /** The key being updated. */
+    public abstract MetricKey getKey();
+    /** The value of the update. */
+    public abstract T getUpdate();
+
+    public static <T> MetricUpdate<T> create(MetricKey key, T update) {
+      return new AutoValue_MetricUpdates_MetricUpdate(key, update);
+    }
+  }
+
+  /** Returns true if there are no updates in this MetricUpdates object. */
+  public boolean isEmpty() {
+    return Iterables.isEmpty(counterUpdates())
+        && Iterables.isEmpty(distributionUpdates());
+  }
+
+  /** All of the counter updates. */
+  public abstract Iterable<MetricUpdate<Long>> counterUpdates();
+
+  /** All of the distribution updates. */
+  public abstract Iterable<MetricUpdate<DistributionData>> 
distributionUpdates();
+
+  /** Create a new {@link MetricUpdates} bundle. */
+  public static MetricUpdates create(
+      Iterable<MetricUpdate<Long>> counterUpdates,
+      Iterable<MetricUpdate<DistributionData>> distributionUpdates) {
+    return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
new file mode 100644
index 0000000..b72a0b2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * The <code>Metrics</code> is a utility class for producing various kinds of 
metrics for
+ * reporting properties of an executing pipeline.
+ */
+@Experimental(Kind.METRICS)
+public class Metrics {
+
+  private Metrics() {}
+
+  /**
+   * Create a metric that can be incremented and decremented, and is 
aggregated by taking the sum.
+   */
+  public static Counter counter(String namespace, String name) {
+    return new DelegatingCounter(MetricName.named(namespace, name));
+  }
+
+  /**
+   * Create a metric that can be incremented and decremented, and is 
aggregated by taking the sum.
+   */
+  public static Counter counter(Class<?> namespace, String name) {
+    return new DelegatingCounter(MetricName.named(namespace, name));
+  }
+
+  /**
+   * Create a metric that records various statistics about the distribution of 
reported values.
+   */
+  public static Distribution distribution(String namespace, String name) {
+    return new DelegatingDistribution(MetricName.named(namespace, name));
+  }
+
+  /**
+   * Create a metric that records various statistics about the distribution of 
reported values.
+   */
+  public static Distribution distribution(Class<?> namespace, String name) {
+    return new DelegatingDistribution(MetricName.named(namespace, name));
+  }
+
+  /** Implementation of {@link Counter} that delegates to the instance for the 
current context. */
+  private static class DelegatingCounter implements Counter {
+    private final MetricName name;
+
+    private DelegatingCounter(MetricName name) {
+      this.name = name;
+    }
+
+    /** Increment the counter. */
+    @Override public void inc() {
+      inc(1);
+    }
+
+    /** Increment the counter by the given amount. */
+    @Override public void inc(long n) {
+      MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+      if (container != null) {
+        container.getCounter(name).inc(n);
+      }
+    }
+
+    /* Decrement the counter. */
+    @Override public void dec() {
+      inc(-1);
+    }
+
+    /* Decrement the counter by the given amount. */
+    @Override public void dec(long n) {
+      inc(-1 * n);
+    }
+  }
+
+  /**
+   * Implementation of {@link Distribution} that delegates to the instance for 
the current context.
+   */
+  private static class DelegatingDistribution implements Distribution {
+    private final MetricName name;
+
+    private DelegatingDistribution(MetricName name) {
+      this.name = name;
+    }
+
+    @Override
+    public void update(long value) {
+      MetricsContainer container = MetricsEnvironment.getCurrentContainer();
+      if (container != null) {
+        container.getDistribution(name).update(value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
new file mode 100644
index 0000000..10032a2
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Map;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+
+/**
+ * Holds the metrics for a single step and unit-of-commit (bundle).
+ *
+ * <p>This class is thread-safe. It is intended to be used with 1 (or more) 
threads are updating
+ * metrics and at-most 1 thread is extracting updates by calling {@link 
#getUpdates} and
+ * {@link #commitUpdates}. Outside of this it is still safe. Although races in 
the update extraction
+ * may cause updates that don't actually have any changes, it will never lose 
an update.
+ *
+ * <p>For consistency, all threads that update metrics should finish before 
getting the final
+ * cumulative values/updates.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsContainer {
+
+  private final String stepName;
+
+  private MetricsMap<MetricName, CounterCell> counters =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() {
+        @Override
+        public CounterCell createInstance(MetricName unusedKey) {
+          return new CounterCell();
+        }
+      });
+
+  private MetricsMap<MetricName, DistributionCell> distributions =
+      new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() {
+        @Override
+        public DistributionCell createInstance(MetricName unusedKey) {
+          return new DistributionCell();
+        }
+      });
+
+  /**
+   * Create a new {@link MetricsContainer} associated with the given {@code 
stepName}.
+   */
+  public MetricsContainer(String stepName) {
+    this.stepName = stepName;
+  }
+
+  /**
+   * Return the {@link CounterCell} that should be used for implementing the 
given
+   * {@code metricName} in this container.
+   */
+  public CounterCell getCounter(MetricName metricName) {
+    return counters.get(metricName);
+  }
+
+  public DistributionCell getDistribution(MetricName metricName) {
+    return distributions.get(metricName);
+  }
+
+  private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+  ImmutableList<MetricUpdate<UpdateT>> extractUpdates(
+      MetricsMap<MetricName, CellT> cells) {
+    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
+    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+      if (cell.getValue().getDirty().beforeCommit()) {
+        updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()),
+            cell.getValue().getCumulative()));
+      }
+    }
+    return updates.build();
+  }
+
+  /**
+   * Return the cumulative values for any metrics that have changed since the 
last time updates were
+   * committed.
+   */
+  public MetricUpdates getUpdates() {
+    return MetricUpdates.create(
+        extractUpdates(counters),
+        extractUpdates(distributions));
+  }
+
+  private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, 
?>> cells) {
+    for (MetricCell<?, ?> cell : cells.values()) {
+      cell.getDirty().afterCommit();
+    }
+  }
+
+  /**
+   * Mark all of the updates that were retrieved with the latest call to 
{@link #getUpdates()} as
+   * committed.
+   */
+  public void commitUpdates() {
+    commitUpdates(counters);
+    commitUpdates(distributions);
+  }
+
+  private <UpdateT, CellT extends MetricCell<?, UpdateT>>
+  ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(
+      MetricsMap<MetricName, CellT> cells) {
+    ImmutableList.Builder<MetricUpdate<UpdateT>> updates = 
ImmutableList.builder();
+    for (Map.Entry<MetricName, CellT> cell : cells.entries()) {
+      UpdateT update = checkNotNull(cell.getValue().getCumulative());
+      updates.add(MetricUpdate.create(MetricKey.create(stepName, 
cell.getKey()), update));
+    }
+    return updates.build();
+  }
+
+  /**
+   * Return the {@link MetricUpdates} representing the cumulative values of 
all metrics in this
+   * container.
+   */
+  public MetricUpdates getCumulative() {
+    ImmutableList.Builder<MetricUpdate<Long>> counterUpdates = 
ImmutableList.builder();
+    for (Map.Entry<MetricName, CounterCell> counter : counters.entries()) {
+      counterUpdates.add(MetricUpdate.create(
+          MetricKey.create(stepName, counter.getKey()), 
counter.getValue().getCumulative()));
+    }
+
+    ImmutableList.Builder<MetricUpdate<DistributionData>> distributionUpdates =
+        ImmutableList.builder();
+    for (Map.Entry<MetricName, DistributionCell> distribution : 
distributions.entries()) {
+      distributionUpdates.add(MetricUpdate.create(
+          MetricKey.create(stepName, distribution.getKey()),
+          distribution.getValue().getCumulative()));
+    }
+    return MetricUpdates.create(
+        extractCumulatives(counters),
+        extractCumulatives(distributions));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
new file mode 100644
index 0000000..ef2660a8
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages and provides the metrics container associated with each thread.
+ *
+ * <p>Users should not interact directly with this class. Instead, use {@link 
Metrics} and the
+ * returned objects to create and modify metrics.
+ *
+ * <p>The runner should create {@link MetricsContainer} for each context in 
which metrics are
+ * reported (by step and name) and call {@link #setMetricsContainer} before 
invoking any code that
+ * may update metrics within that step.
+ *
+ * <p>The runner should call {@link #unsetMetricsContainer} (or {@link 
#setMetricsContainer} back to
+ * the previous value) when exiting code that set the metrics container.
+ */
+public class MetricsEnvironment {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetricsContainer.class);
+
+  private static final AtomicBoolean METRICS_SUPPORTED = new 
AtomicBoolean(false);
+  private static final AtomicBoolean REPORTED_MISSING_CONTAINER = new 
AtomicBoolean(false);
+
+  private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD =
+      new ThreadLocal<MetricsContainer>();
+
+  /** Set the {@link MetricsContainer} for the current thread. */
+  public static void setMetricsContainer(MetricsContainer container) {
+    CONTAINER_FOR_THREAD.set(container);
+  }
+
+
+  /** Clear the {@link MetricsContainer} for the current thread. */
+  public static void unsetMetricsContainer() {
+    CONTAINER_FOR_THREAD.remove();
+  }
+
+  /** Called by the run to indicate whether metrics reporting is supported. */
+  public static void setMetricsSupported(boolean supported) {
+    METRICS_SUPPORTED.set(supported);
+  }
+
+  /**
+   * Return the {@link MetricsContainer} for the current thread.
+   *
+   * <p>May return null if metrics are not supported by the current runner or 
if the current thread
+   * is not a work-execution thread. The first time this happens in a given 
thread it will log a
+   * diagnostic message.
+   */
+  @Nullable
+  public static MetricsContainer getCurrentContainer() {
+    MetricsContainer container = CONTAINER_FOR_THREAD.get();
+    if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, 
true)) {
+      if (METRICS_SUPPORTED.get()) {
+        LOGGER.error(
+            "Unable to update metrics on the current thread. "
+                + "Most likely caused by using metrics outside the managed 
work-execution thread.");
+      } else {
+        LOGGER.warn("Reporting metrics are not supported in the current 
execution environment.");
+      }
+    }
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
new file mode 100644
index 0000000..ec81251
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * Simple POJO representing a filter for querying metrics.
+ */
+@Experimental(Kind.METRICS)
+@AutoValue
+public abstract class MetricsFilter {
+
+  public Set<String> steps() {
+    return immutableSteps();
+  }
+
+  public Set<MetricNameFilter> names() {
+    return immutableNames();
+  }
+
+  protected abstract ImmutableSet<String> immutableSteps();
+  protected abstract ImmutableSet<MetricNameFilter> immutableNames();
+
+  public static Builder builder() {
+    return new AutoValue_MetricsFilter.Builder();
+  }
+
+  /**
+   * Builder for creating a {@link MetricsFilter}.
+   */
+  @AutoValue.Builder
+  public abstract static class Builder {
+
+    protected abstract ImmutableSet.Builder<MetricNameFilter> 
immutableNamesBuilder();
+    protected abstract ImmutableSet.Builder<String> immutableStepsBuilder();
+
+    /**
+     * Add a {@link MetricNameFilter}.
+     *
+     * <p>If no name filters are specified then all metric names will be 
inculded.
+     *
+     *
+     * <p>If one or more name filters are specified, then only metrics that 
match one or more of the
+     * filters will be included.
+     */
+    public Builder addNameFilter(MetricNameFilter nameFilter) {
+      immutableNamesBuilder().add(nameFilter);
+      return this;
+    }
+
+    /**
+     * Add a step filter.
+     *
+     * <p>If no steps are specified then metrics will be included for all 
steps.
+     *
+     * <p>If one or more steps are specified, then metrics will be included if 
they are part of
+     * any of the specified steps.
+     */
+    public Builder addStep(String step) {
+      immutableStepsBuilder().add(step);
+      return this;
+    }
+
+    public abstract MetricsFilter build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
new file mode 100644
index 0000000..5a02106
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.metrics;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Iterables;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+
+/**
+ * A map from {@code K} to {@code T} that supports getting or creating values 
associated with a key
+ * in a thread-safe manner.
+ */
+@Experimental(Kind.METRICS)
+public class MetricsMap<K, T> {
+
+  /** Interface for creating instances to populate the {@link MetricsMap}. */
+  public interface Factory<K, T> {
+    /**
+     * Create an instance of {@code T} to use with the given {@code key}.
+     *
+     * <p>It must be safe to call this from multiple threads.
+     */
+    T createInstance(K key);
+  }
+
+  private final Factory<K, T> factory;
+  private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>();
+
+  public MetricsMap(Factory<K, T> factory) {
+    this.factory = factory;
+  }
+
+  /**
+   * Get or create the value associated with the given key.
+   */
+  public T get(K key) {
+    T metric = metrics.get(key);
+    if (metric == null) {
+      metric = factory.createInstance(key);
+      metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), 
metric);
+    }
+    return metric;
+  }
+
+  /**
+   * Get the value associated with the given key, if it exists.
+   */
+  @Nullable
+  public T tryGet(K key) {
+    return metrics.get(key);
+  }
+
+  /**
+   * Return an iterable over the entries in the current {@link  MetricsMap}.
+   */
+  public Iterable<Map.Entry<K, T>> entries() {
+    return Iterables.unmodifiableIterable(metrics.entrySet());
+  }
+
+  /**
+   * Return an iterable over the values in the current {@link MetricsMap}.
+   */
+  public Iterable<T> values() {
+    return Iterables.unmodifiableIterable(metrics.values());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
new file mode 100644
index 0000000..f71dc7a
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Metrics allow exporting information about the execution of a pipeline.
+ * They are intended to be used for monitoring and understanding the
+ * execution.
+ *
+ * <p>Metrics may also be queried from the {@link 
org.apache.beam.sdk.PipelineResult} object.
+ *
+ * <p>Runners should look at {@link 
org.apache.beam.sdk.metrics.MetricsContainer} for details on
+ * how to support metrics.
+ */
+package org.apache.beam.sdk.metrics;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
new file mode 100644
index 0000000..408f145
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link CounterCell}.
+ */
+@RunWith(JUnit4.class)
+public class CounterCellTest {
+
+  private CounterCell cell = new CounterCell();
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.inc(5);
+    cell.inc(7);
+    assertThat(cell.getCumulative(), equalTo(12L));
+    assertThat("getCumulative is idempotent", cell.getCumulative(), 
equalTo(12L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+    assertThat(cell.getCumulative(), equalTo(12L));
+
+    cell.inc(30);
+    assertThat(cell.getCumulative(), equalTo(42L));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
new file mode 100644
index 0000000..d00f8cd
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DirtyStateTest}.
+ */
+@RunWith(JUnit4.class)
+public class DirtyStateTest {
+
+  private final DirtyState dirty = new DirtyState();
+
+  @Test
+  public void basicPath() {
+    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+    dirty.afterCommit();
+    assertThat("Should be clean after commit", dirty.beforeCommit(), 
is(false));
+
+    dirty.afterModification();
+    assertThat("Should be dirty after change", dirty.beforeCommit(), is(true));
+    dirty.afterCommit();
+    assertThat("Should be clean after commit", dirty.beforeCommit(), 
is(false));
+  }
+
+  @Test
+  public void changeAfterBeforeCommit() {
+    assertThat("Should start dirty", dirty.beforeCommit(), is(true));
+    dirty.afterModification();
+    dirty.afterCommit();
+    assertThat("Changes after beforeCommit should be dirty after afterCommit",
+        dirty.beforeCommit(), is(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
new file mode 100644
index 0000000..07e0b26
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DistributionCell}.
+ */
+@RunWith(JUnit4.class)
+public class DistributionCellTest {
+  private DistributionCell cell = new DistributionCell();
+
+  @Test
+  public void testDeltaAndCumulative() {
+    cell.update(5);
+    cell.update(7);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 
7)));
+    assertThat("getCumulative is idempotent",
+        cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7)));
+
+    assertThat(cell.getDirty().beforeCommit(), equalTo(true));
+    cell.getDirty().afterCommit();
+    assertThat(cell.getDirty().beforeCommit(), equalTo(false));
+
+    cell.update(30);
+    assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 
30)));
+
+    assertThat("Adding a new value made the cell dirty",
+        cell.getDirty().beforeCommit(), equalTo(true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
new file mode 100644
index 0000000..bdcb94f
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import java.util.Objects;
+import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+
+/**
+ * Matchers for metrics.
+ */
+public class MetricMatchers {
+
+  public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, 
final T update) {
+    return new TypeSafeMatcher<MetricUpdate<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricUpdate<T> item) {
+        return Objects.equals(name, item.getKey().metricName().name())
+            && Objects.equals(update, item.getUpdate());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricUpdate{name=").appendValue(name)
+            .appendText(", update=").appendValue(update)
+            .appendText("}");
+      }
+    };
+  }
+
+  public static <T> Matcher<MetricUpdate<T>> metricUpdate(
+      final String namespace, final String name, final String step, final T 
update) {
+    return new TypeSafeMatcher<MetricUpdate<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricUpdate<T> item) {
+        return Objects.equals(namespace, 
item.getKey().metricName().namespace())
+            && Objects.equals(name, item.getKey().metricName().name())
+            && Objects.equals(step, item.getKey().stepName())
+            && Objects.equals(update, item.getUpdate());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricUpdate{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(", update=").appendValue(update)
+            .appendText("}");
+      }
+    };
+  }
+
+  public static <T> Matcher<MetricResult<T>> metricResult(
+      final String namespace, final String name, final String step,
+      final T logical, final T physical) {
+    return new TypeSafeMatcher<MetricResult<T>>() {
+      @Override
+      protected boolean matchesSafely(MetricResult<T> item) {
+        return Objects.equals(namespace, item.name().namespace())
+            && Objects.equals(name, item.name().name())
+            && Objects.equals(step, item.step())
+            && Objects.equals(logical, item.committed())
+            && Objects.equals(physical, item.attempted());
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description
+            .appendText("MetricResult{inNamespace=").appendValue(namespace)
+            .appendText(", name=").appendValue(name)
+            .appendText(", step=").appendValue(step)
+            .appendText(", logical=").appendValue(logical)
+            .appendText(", physical=").appendValue(physical)
+            .appendText("}");
+      }
+    };
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
new file mode 100644
index 0000000..58797ce
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricsContainer}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsContainerTest {
+
+  @Test
+  public void testCounterDeltas() {
+    MetricsContainer container = new MetricsContainer("step1");
+    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+    assertThat("All counters should start out dirty",
+        container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 0L),
+        metricUpdate("name2", 0L)));
+    container.commitUpdates();
+    assertThat("After commit no counters should be dirty",
+        container.getUpdates().counterUpdates(), emptyIterable());
+
+    c1.inc(5L);
+    c2.inc(4L);
+
+    assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    assertThat("Since we haven't committed, updates are still included",
+        container.getUpdates().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    container.commitUpdates();
+    assertThat("After commit there are no updates",
+        container.getUpdates().counterUpdates(), emptyIterable());
+
+    c1.inc(8L);
+    assertThat(container.getUpdates().counterUpdates(), contains(
+        metricUpdate("name1", 13L)));
+  }
+
+  @Test
+  public void testCounterCumulatives() {
+    MetricsContainer container = new MetricsContainer("step1");
+    CounterCell c1 = container.getCounter(MetricName.named("ns", "name1"));
+    CounterCell c2 = container.getCounter(MetricName.named("ns", "name2"));
+    c1.inc(2L);
+    c2.inc(4L);
+    c1.inc(3L);
+
+    container.getUpdates();
+    container.commitUpdates();
+    assertThat("Committing updates shouldn't affect cumulative counter values",
+        container.getCumulative().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 5L),
+        metricUpdate("name2", 4L)));
+
+    c1.inc(8L);
+    assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder(
+        metricUpdate("name1", 13L),
+        metricUpdate("name2", 4L)));
+  }
+
+  @Test
+  public void testDistributionDeltas() {
+    MetricsContainer container = new MetricsContainer("step1");
+    DistributionCell c1 = container.getDistribution(MetricName.named("ns", 
"name1"));
+    DistributionCell c2 = container.getDistribution(MetricName.named("ns", 
"name2"));
+
+    assertThat("Initial update includes initial zero-values",
+        container.getUpdates().distributionUpdates(), containsInAnyOrder(
+        metricUpdate("name1", DistributionData.EMPTY),
+        metricUpdate("name2", DistributionData.EMPTY)));
+
+    container.commitUpdates();
+    assertThat("No updates after commit",
+        container.getUpdates().distributionUpdates(), emptyIterable());
+
+    c1.update(5L);
+    c2.update(4L);
+
+    assertThat(container.getUpdates().distributionUpdates(), 
containsInAnyOrder(
+        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+    assertThat("Updates stay the same without commit",
+        container.getUpdates().distributionUpdates(), containsInAnyOrder(
+        metricUpdate("name1", DistributionData.create(5, 1, 5, 5)),
+        metricUpdate("name2", DistributionData.create(4, 1, 4, 4))));
+
+    container.commitUpdates();
+    assertThat("No updatess after commit",
+        container.getUpdates().distributionUpdates(), emptyIterable());
+
+    c1.update(8L);
+    c1.update(4L);
+    assertThat(container.getUpdates().distributionUpdates(), contains(
+        metricUpdate("name1", DistributionData.create(17, 3, 4, 8))));
+    container.commitUpdates();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
new file mode 100644
index 0000000..4200a20
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.metrics;
+
+import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link MetricsEnvironment}.
+ */
+@RunWith(JUnit4.class)
+public class MetricsEnvironmentTest {
+  @After
+  public void teardown() {
+    MetricsEnvironment.unsetMetricsContainer();
+  }
+
+  @Test
+  public void testUsesAppropriateMetricsContainer() {
+    Counter counter = Metrics.counter("ns", "name");
+    MetricsContainer c1 = new MetricsContainer("step1");
+    MetricsContainer c2 = new MetricsContainer("step2");
+
+    MetricsEnvironment.setMetricsContainer(c1);
+    counter.inc();
+    MetricsEnvironment.setMetricsContainer(c2);
+    counter.dec();
+    MetricsEnvironment.unsetMetricsContainer();
+
+    MetricUpdates updates1 = c1.getUpdates();
+    MetricUpdates updates2 = c2.getUpdates();
+    assertThat(updates1.counterUpdates(), contains(metricUpdate("ns", "name", 
"step1", 1L)));
+    assertThat(updates2.counterUpdates(), contains(metricUpdate("ns", "name", 
"step2", -1L)));
+  }
+
+  @Test
+  public void testBehavesWithoutMetricsContainer() {
+    assertNull(MetricsEnvironment.getCurrentContainer());
+  }
+}

Reply via email to