Initial Metrics API for Beam Java This includes a simple Counter metric and a Distribution metric that reports the SUM, COUNT, MIN, MAX and MEAN of the reported values.
The API is labeled @Experimental since metrics will only be reported and queryable with the DirectRunner, and the API may change as it is implemented on other runners. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8524ed95 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8524ed95 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8524ed95 Branch: refs/heads/master Commit: 8524ed9545f5af4bdeb54601f333549b34eb35aa Parents: e969f3d Author: bchambers <bchamb...@google.com> Authored: Wed Oct 12 10:29:50 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Thu Oct 13 15:29:29 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/annotations/Experimental.java | 3 + .../org/apache/beam/sdk/metrics/Counter.java | 40 +++++ .../apache/beam/sdk/metrics/CounterCell.java | 76 ++++++++++ .../org/apache/beam/sdk/metrics/DirtyState.java | 98 ++++++++++++ .../apache/beam/sdk/metrics/Distribution.java | 30 ++++ .../beam/sdk/metrics/DistributionCell.java | 58 +++++++ .../beam/sdk/metrics/DistributionData.java | 59 ++++++++ .../beam/sdk/metrics/DistributionResult.java | 42 ++++++ .../org/apache/beam/sdk/metrics/Metric.java | 24 +++ .../org/apache/beam/sdk/metrics/MetricCell.java | 47 ++++++ .../org/apache/beam/sdk/metrics/MetricKey.java | 40 +++++ .../org/apache/beam/sdk/metrics/MetricName.java | 46 ++++++ .../beam/sdk/metrics/MetricNameFilter.java | 60 ++++++++ .../beam/sdk/metrics/MetricQueryResults.java | 33 ++++ .../apache/beam/sdk/metrics/MetricResult.java | 45 ++++++ .../apache/beam/sdk/metrics/MetricResults.java | 34 +++++ .../apache/beam/sdk/metrics/MetricUpdates.java | 72 +++++++++ .../org/apache/beam/sdk/metrics/Metrics.java | 110 ++++++++++++++ .../beam/sdk/metrics/MetricsContainer.java | 150 +++++++++++++++++++ .../beam/sdk/metrics/MetricsEnvironment.java | 85 +++++++++++ .../apache/beam/sdk/metrics/MetricsFilter.java | 86 +++++++++++ .../org/apache/beam/sdk/metrics/MetricsMap.java | 86 +++++++++++ .../apache/beam/sdk/metrics/package-info.java | 28 ++++ .../beam/sdk/metrics/CounterCellTest.java | 55 +++++++ .../apache/beam/sdk/metrics/DirtyStateTest.java | 56 +++++++ .../beam/sdk/metrics/DistributionCellTest.java | 53 +++++++ .../apache/beam/sdk/metrics/MetricMatchers.java | 99 ++++++++++++ .../beam/sdk/metrics/MetricsContainerTest.java | 129 ++++++++++++++++ .../sdk/metrics/MetricsEnvironmentTest.java | 63 ++++++++ .../apache/beam/sdk/metrics/MetricsMapTest.java | 103 +++++++++++++ .../apache/beam/sdk/metrics/MetricsTest.java | 98 ++++++++++++ 31 files changed, 2008 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java index 789f4b2..14d2358 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java @@ -83,5 +83,8 @@ public @interface Experimental { * Do not use: API is unstable and runner support is incomplete. */ SPLITTABLE_DO_FN, + + /** Metrics-related experimental APIs. */ + METRICS } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java new file mode 100644 index 0000000..9f48016 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Counter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A metric that reports a single long value and can be incremented or decremented. + */ +@Experimental(Kind.METRICS) +public interface Counter extends Metric { + + /** Increment the counter. */ + void inc(); + + /** Increment the counter by the given amount. */ + void inc(long n); + + /* Decrement the counter. */ + void dec(); + + /* Decrement the counter by the given amount. */ + void dec(long n); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java new file mode 100644 index 0000000..bb65833 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/CounterCell.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Tracks the current value (and delta) for a Counter metric for a specific context and bundle. + * + * <p>This class generally shouldn't be used directly. The only exception is within a runner where + * a counter is being reported for a specific step (rather than the counter in the current context). + */ +@Experimental(Kind.METRICS) +class CounterCell implements MetricCell<Counter, Long>, Counter { + + private final DirtyState dirty = new DirtyState(); + private final AtomicLong value = new AtomicLong(); + + /** Increment the counter by the given amount. */ + private void add(long n) { + value.addAndGet(n); + dirty.afterModification(); + } + + @Override + public DirtyState getDirty() { + return dirty; + } + + @Override + public Long getCumulative() { + return value.get(); + } + + @Override + public Counter getInterface() { + return this; + } + + @Override + public void inc() { + add(1); + } + + @Override + public void inc(long n) { + add(n); + } + + @Override + public void dec() { + add(-1); + } + + @Override + public void dec(long n) { + add(-n); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java new file mode 100644 index 0000000..6706be8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DirtyState.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Atomically tracks the dirty-state of a metric. + * + * <p>Reporting an update is split into two parts such that only changes made before the call to + * {@link #beforeCommit()} are committed when {@link #afterCommit()} is invoked. This allows for + * a two-step commit process of gathering all the dirty updates (calling {#link beforeCommit()}) + * followed by committing and calling {#link afterCommit()}. + * + * <p>The tracking of dirty states is done conservatively -- sometimes {@link #beforeCommit()} + * will return true (indicating a dirty metric) even if there have been no changes since the last + * commit. + * + * <p>There is also a possible race when the underlying metric is modified but the call to + * {@link #afterModification()} hasn't happened before the call to {@link #beforeCommit()}. In this + * case the next round of metric updating will see the changes. If this was for the final commit, + * then the metric updates shouldn't be extracted until all possible user modifications have + * completed. + */ +@Experimental(Kind.METRICS) +class DirtyState { + private enum State { + /** Indicates that there have been changes to the MetricCell since last commit. */ + DIRTY, + /** Indicates that there have been no changes to the MetricCell since last commit. */ + CLEAN, + /** Indicates that a commit of the current value is in progress. */ + COMMITTING + } + + private final AtomicReference<State> dirty = new AtomicReference<>(State.DIRTY); + + /** + * Indicate that changes have been made to the metric being tracked by this {@link DirtyState}. + * + * <p>Should be called <b>after</b> modification of the value. + */ + public void afterModification() { + dirty.set(State.DIRTY); + } + + /** + * Check the dirty state and mark the metric as committing. + * + * <p>If the state was {@code CLEAN}, this returns {@code false}. If the state was {@code DIRTY} + * or {@code COMMITTING} this returns {@code true} and sets the state to {@code COMMITTING}. + * + * @return {@code false} if the state is clean and {@code true} otherwise. + */ + public boolean beforeCommit() { + // After this loop, we want the state to be either CLEAN or COMMITTING. + // If the state was CLEAN, we don't need to do anything (and exit the loop early) + // If the state was DIRTY, we will attempt to do a CAS(DIRTY, COMMITTING). This will only + // fail if another thread is getting updates which generally shouldn't be the case. + // If the state was COMMITTING, we will attempt to do a CAS(COMMITTING, COMMITTING). This will + // fail if another thread commits updates (which shouldn't be the case) or if the user code + // updates the metric, in which case it will transition to DIRTY and the next iteration will + // successfully update it. + State state; + do { + state = dirty.get(); + } while (state != State.CLEAN && !dirty.compareAndSet(state, State.COMMITTING)); + + return state != State.CLEAN; + } + + /** + * Mark any changes up to the most recently call to {@link #beforeCommit()}} as committed. + * The next call to {@link #beforeCommit()} will return {@code false} unless there have + * been changes made since the previous call to {@link #beforeCommit()}. + */ + public void afterCommit() { + dirty.compareAndSet(State.COMMITTING, State.CLEAN); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java new file mode 100644 index 0000000..b789020 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Distribution.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A metric that reports information about the distribution of reported values. + */ +@Experimental(Kind.METRICS) +public interface Distribution extends Metric { + /** Add an observation to this distribution. */ + void update(long value); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java new file mode 100644 index 0000000..f0074a9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionCell.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicReference; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Tracks the current value (and delta) for a Distribution metric. + */ +@Experimental(Kind.METRICS) +class DistributionCell implements MetricCell<Distribution, DistributionData>, Distribution { + + private final DirtyState dirty = new DirtyState(); + private final AtomicReference<DistributionData> value = + new AtomicReference<DistributionData>(DistributionData.EMPTY); + + /** Increment the counter by the given amount. */ + @Override + public void update(long n) { + DistributionData original; + do { + original = value.get(); + } while (!value.compareAndSet(original, original.combine(DistributionData.singleton(n)))); + dirty.afterModification(); + } + + @Override + public DirtyState getDirty() { + return dirty; + } + + @Override + public DistributionData getCumulative() { + return value.get(); + } + + @Override + public Distribution getInterface() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java new file mode 100644 index 0000000..59c7fbd --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionData.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; + +/** + * Data describing the the distribution. This should retain enough detail that it can be combined + * with other {@link DistributionData}. + * + * <p>This is kept distinct from {@link DistributionResult} since this may be extended to include + * data necessary to approximate quantiles, etc. while {@link DistributionResult} would just include + * the approximate value of those quantiles. + */ +@AutoValue +public abstract class DistributionData { + + public abstract long sum(); + public abstract long count(); + public abstract long min(); + public abstract long max(); + + public static final DistributionData EMPTY = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE); + + public static DistributionData create(long sum, long count, long min, long max) { + return new AutoValue_DistributionData(sum, count, min, max); + } + + public static DistributionData singleton(long value) { + return create(value, 1, value, value); + } + + public DistributionData combine(DistributionData value) { + return create( + sum() + value.sum(), + count() + value.count(), + Math.min(value.min(), min()), + Math.max(value.max(), max())); + } + + public DistributionResult extractResult() { + return DistributionResult.create(sum(), count(), min(), max()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java new file mode 100644 index 0000000..27c242c --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DistributionResult.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; + +/** + * The result of a {@link Distribution} metric. + */ +@AutoValue +public abstract class DistributionResult { + + public abstract long sum(); + public abstract long count(); + public abstract long min(); + public abstract long max(); + + public double mean() { + return (1.0 * sum()) / count(); + } + + public static final DistributionResult ZERO = create(0, 0, Long.MAX_VALUE, Long.MIN_VALUE); + + public static DistributionResult create(long sum, long count, long min, long max) { + return new AutoValue_DistributionResult(sum, count, min, max); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java new file mode 100644 index 0000000..37a5f65 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metric.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +/** + * Marker interface for all user-facing metrics. + */ +public interface Metric { } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java new file mode 100644 index 0000000..211b2dd --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricCell.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A {@link MetricCell} is used for accumulating in-memory changes to a metric. It represents a + * specific metric name in a single context. + * + * @param <UserT> The type of the user interface for reporting changes to this cell. + * @param <DataT> The type of metric data stored (and extracted) from this cell. + */ +@Experimental(Kind.METRICS) +interface MetricCell<UserT extends Metric, DataT> { + + /** + * Return the {@link DirtyState} tracking whether this metric cell contains uncommitted changes. + */ + DirtyState getDirty(); + + /** + * Return the cumulative value of this metric. + */ + DataT getCumulative(); + + /** + * Return the user-facing mutator for this cell. + */ + UserT getInterface(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java new file mode 100644 index 0000000..bfa4df5 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricKey.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Metrics are keyed by the step name they are associated with and the name of the metric. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricKey { + + /** The step name that is associated with this metric. */ + public abstract String stepName(); + + /** The name of the metric. */ + public abstract MetricName metricName(); + + public static MetricKey create(String stepName, MetricName metricName) { + return new AutoValue_MetricKey(stepName, metricName); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java new file mode 100644 index 0000000..843a885 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricName.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The name of a metric consists of a {@link #namespace} and a {@link #name}. The {@link #namespace} + * allows grouping related metrics together and also prevents collisions between multiple metrics + * with the same name. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricName { + + /** The namespace associated with this metric. */ + public abstract String namespace(); + + /** The name of this metric. */ + public abstract String name(); + + public static MetricName named(String namespace, String name) { + return new AutoValue_MetricName(namespace, name); + } + + public static MetricName named(Class<?> namespace, String name) { + return new AutoValue_MetricName(namespace.getName(), name); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java new file mode 100644 index 0000000..a2c3798 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricNameFilter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The name of a metric. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricNameFilter { + + /** The inNamespace that a metric must be in to match this {@link MetricNameFilter}. */ + public abstract String getNamespace(); + + /** If set, the metric must have this name to match this {@link MetricNameFilter}. */ + @Nullable + public abstract String getName(); + + public static MetricNameFilter inNamespace(String namespace) { + return new AutoValue_MetricNameFilter(namespace, null); + } + + public static MetricNameFilter inNamespace(Class<?> namespace) { + return new AutoValue_MetricNameFilter(namespace.getName(), null); + } + + public static MetricNameFilter named(String namespace, String name) { + checkNotNull(name, "Must specify a name"); + return new AutoValue_MetricNameFilter(namespace, name); + } + + public static MetricNameFilter named(Class<?> namespace, String name) { + checkNotNull(namespace, "Must specify a inNamespace"); + checkNotNull(name, "Must specify a name"); + return new AutoValue_MetricNameFilter(namespace.getSimpleName(), name); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java new file mode 100644 index 0000000..2241ba8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricQueryResults.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The results of a query for metrics. Allows accessing all of the metrics that matched the filter. + */ +@Experimental(Kind.METRICS) +public interface MetricQueryResults { + /** Return the metric results for the counters that matched the filter. */ + Iterable<MetricResult<Long>> counters(); + + /** Return the metric results for the distributions that matched the filter. */ + Iterable<MetricResult<DistributionResult>> distributions(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java new file mode 100644 index 0000000..9a3971a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The results of a single current metric. + */ +@Experimental(Kind.METRICS) +public interface MetricResult<T> { + /** Return the name of the metric. */ + MetricName name(); + /** Return the step context to which this metric result applies. */ + String step(); + + /** + * Return the value of this metric across all successfully completed parts of the pipeline. + * + * <p>Not all runners will support committed metrics. If they are not supported, the runner will + * throw an {@link UnsupportedOperationException}. + */ + T committed(); + + /** + * Return the value of this metric across all attempts of executing all parts of the pipeline. + */ + T attempted(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java new file mode 100644 index 0000000..dab65ea --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResults.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Methods for interacting with the metrics of a pipeline that has been executed. Accessed via + * {@link PipelineResult#metrics()}. + */ +@Experimental(Kind.METRICS) +public abstract class MetricResults { + /** + * Query for all metrics that match the filter. + */ + public abstract MetricQueryResults queryMetrics(MetricsFilter filter); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java new file mode 100644 index 0000000..e84dc66 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricUpdates.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.Iterables; +import java.util.Collections; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Representation of multiple metric updates. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricUpdates { + + public static final MetricUpdates EMPTY = MetricUpdates.create( + Collections.<MetricUpdate<Long>>emptyList(), + Collections.<MetricUpdate<DistributionData>>emptyList()); + + /** + * Representation of a single metric update. + * @param <T> The type of value representing the update. + */ + @AutoValue + public abstract static class MetricUpdate<T> { + + /** The key being updated. */ + public abstract MetricKey getKey(); + /** The value of the update. */ + public abstract T getUpdate(); + + public static <T> MetricUpdate<T> create(MetricKey key, T update) { + return new AutoValue_MetricUpdates_MetricUpdate(key, update); + } + } + + /** Returns true if there are no updates in this MetricUpdates object. */ + public boolean isEmpty() { + return Iterables.isEmpty(counterUpdates()) + && Iterables.isEmpty(distributionUpdates()); + } + + /** All of the counter updates. */ + public abstract Iterable<MetricUpdate<Long>> counterUpdates(); + + /** All of the distribution updates. */ + public abstract Iterable<MetricUpdate<DistributionData>> distributionUpdates(); + + /** Create a new {@link MetricUpdates} bundle. */ + public static MetricUpdates create( + Iterable<MetricUpdate<Long>> counterUpdates, + Iterable<MetricUpdate<DistributionData>> distributionUpdates) { + return new AutoValue_MetricUpdates(counterUpdates, distributionUpdates); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java new file mode 100644 index 0000000..b72a0b2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/Metrics.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * The <code>Metrics</code> is a utility class for producing various kinds of metrics for + * reporting properties of an executing pipeline. + */ +@Experimental(Kind.METRICS) +public class Metrics { + + private Metrics() {} + + /** + * Create a metric that can be incremented and decremented, and is aggregated by taking the sum. + */ + public static Counter counter(String namespace, String name) { + return new DelegatingCounter(MetricName.named(namespace, name)); + } + + /** + * Create a metric that can be incremented and decremented, and is aggregated by taking the sum. + */ + public static Counter counter(Class<?> namespace, String name) { + return new DelegatingCounter(MetricName.named(namespace, name)); + } + + /** + * Create a metric that records various statistics about the distribution of reported values. + */ + public static Distribution distribution(String namespace, String name) { + return new DelegatingDistribution(MetricName.named(namespace, name)); + } + + /** + * Create a metric that records various statistics about the distribution of reported values. + */ + public static Distribution distribution(Class<?> namespace, String name) { + return new DelegatingDistribution(MetricName.named(namespace, name)); + } + + /** Implementation of {@link Counter} that delegates to the instance for the current context. */ + private static class DelegatingCounter implements Counter { + private final MetricName name; + + private DelegatingCounter(MetricName name) { + this.name = name; + } + + /** Increment the counter. */ + @Override public void inc() { + inc(1); + } + + /** Increment the counter by the given amount. */ + @Override public void inc(long n) { + MetricsContainer container = MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getCounter(name).inc(n); + } + } + + /* Decrement the counter. */ + @Override public void dec() { + inc(-1); + } + + /* Decrement the counter by the given amount. */ + @Override public void dec(long n) { + inc(-1 * n); + } + } + + /** + * Implementation of {@link Distribution} that delegates to the instance for the current context. + */ + private static class DelegatingDistribution implements Distribution { + private final MetricName name; + + private DelegatingDistribution(MetricName name) { + this.name = name; + } + + @Override + public void update(long value) { + MetricsContainer container = MetricsEnvironment.getCurrentContainer(); + if (container != null) { + container.getDistribution(name).update(value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java new file mode 100644 index 0000000..10032a2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsContainer.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.ImmutableList; +import java.util.Map; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; + +/** + * Holds the metrics for a single step and unit-of-commit (bundle). + * + * <p>This class is thread-safe. It is intended to be used with 1 (or more) threads are updating + * metrics and at-most 1 thread is extracting updates by calling {@link #getUpdates} and + * {@link #commitUpdates}. Outside of this it is still safe. Although races in the update extraction + * may cause updates that don't actually have any changes, it will never lose an update. + * + * <p>For consistency, all threads that update metrics should finish before getting the final + * cumulative values/updates. + */ +@Experimental(Kind.METRICS) +public class MetricsContainer { + + private final String stepName; + + private MetricsMap<MetricName, CounterCell> counters = + new MetricsMap<>(new MetricsMap.Factory<MetricName, CounterCell>() { + @Override + public CounterCell createInstance(MetricName unusedKey) { + return new CounterCell(); + } + }); + + private MetricsMap<MetricName, DistributionCell> distributions = + new MetricsMap<>(new MetricsMap.Factory<MetricName, DistributionCell>() { + @Override + public DistributionCell createInstance(MetricName unusedKey) { + return new DistributionCell(); + } + }); + + /** + * Create a new {@link MetricsContainer} associated with the given {@code stepName}. + */ + public MetricsContainer(String stepName) { + this.stepName = stepName; + } + + /** + * Return the {@link CounterCell} that should be used for implementing the given + * {@code metricName} in this container. + */ + public CounterCell getCounter(MetricName metricName) { + return counters.get(metricName); + } + + public DistributionCell getDistribution(MetricName metricName) { + return distributions.get(metricName); + } + + private <UpdateT, CellT extends MetricCell<?, UpdateT>> + ImmutableList<MetricUpdate<UpdateT>> extractUpdates( + MetricsMap<MetricName, CellT> cells) { + ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); + for (Map.Entry<MetricName, CellT> cell : cells.entries()) { + if (cell.getValue().getDirty().beforeCommit()) { + updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), + cell.getValue().getCumulative())); + } + } + return updates.build(); + } + + /** + * Return the cumulative values for any metrics that have changed since the last time updates were + * committed. + */ + public MetricUpdates getUpdates() { + return MetricUpdates.create( + extractUpdates(counters), + extractUpdates(distributions)); + } + + private void commitUpdates(MetricsMap<MetricName, ? extends MetricCell<?, ?>> cells) { + for (MetricCell<?, ?> cell : cells.values()) { + cell.getDirty().afterCommit(); + } + } + + /** + * Mark all of the updates that were retrieved with the latest call to {@link #getUpdates()} as + * committed. + */ + public void commitUpdates() { + commitUpdates(counters); + commitUpdates(distributions); + } + + private <UpdateT, CellT extends MetricCell<?, UpdateT>> + ImmutableList<MetricUpdate<UpdateT>> extractCumulatives( + MetricsMap<MetricName, CellT> cells) { + ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder(); + for (Map.Entry<MetricName, CellT> cell : cells.entries()) { + UpdateT update = checkNotNull(cell.getValue().getCumulative()); + updates.add(MetricUpdate.create(MetricKey.create(stepName, cell.getKey()), update)); + } + return updates.build(); + } + + /** + * Return the {@link MetricUpdates} representing the cumulative values of all metrics in this + * container. + */ + public MetricUpdates getCumulative() { + ImmutableList.Builder<MetricUpdate<Long>> counterUpdates = ImmutableList.builder(); + for (Map.Entry<MetricName, CounterCell> counter : counters.entries()) { + counterUpdates.add(MetricUpdate.create( + MetricKey.create(stepName, counter.getKey()), counter.getValue().getCumulative())); + } + + ImmutableList.Builder<MetricUpdate<DistributionData>> distributionUpdates = + ImmutableList.builder(); + for (Map.Entry<MetricName, DistributionCell> distribution : distributions.entries()) { + distributionUpdates.add(MetricUpdate.create( + MetricKey.create(stepName, distribution.getKey()), + distribution.getValue().getCumulative())); + } + return MetricUpdates.create( + extractCumulatives(counters), + extractCumulatives(distributions)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java new file mode 100644 index 0000000..ef2660a8 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages and provides the metrics container associated with each thread. + * + * <p>Users should not interact directly with this class. Instead, use {@link Metrics} and the + * returned objects to create and modify metrics. + * + * <p>The runner should create {@link MetricsContainer} for each context in which metrics are + * reported (by step and name) and call {@link #setMetricsContainer} before invoking any code that + * may update metrics within that step. + * + * <p>The runner should call {@link #unsetMetricsContainer} (or {@link #setMetricsContainer} back to + * the previous value) when exiting code that set the metrics container. + */ +public class MetricsEnvironment { + + private static final Logger LOGGER = LoggerFactory.getLogger(MetricsContainer.class); + + private static final AtomicBoolean METRICS_SUPPORTED = new AtomicBoolean(false); + private static final AtomicBoolean REPORTED_MISSING_CONTAINER = new AtomicBoolean(false); + + private static final ThreadLocal<MetricsContainer> CONTAINER_FOR_THREAD = + new ThreadLocal<MetricsContainer>(); + + /** Set the {@link MetricsContainer} for the current thread. */ + public static void setMetricsContainer(MetricsContainer container) { + CONTAINER_FOR_THREAD.set(container); + } + + + /** Clear the {@link MetricsContainer} for the current thread. */ + public static void unsetMetricsContainer() { + CONTAINER_FOR_THREAD.remove(); + } + + /** Called by the run to indicate whether metrics reporting is supported. */ + public static void setMetricsSupported(boolean supported) { + METRICS_SUPPORTED.set(supported); + } + + /** + * Return the {@link MetricsContainer} for the current thread. + * + * <p>May return null if metrics are not supported by the current runner or if the current thread + * is not a work-execution thread. The first time this happens in a given thread it will log a + * diagnostic message. + */ + @Nullable + public static MetricsContainer getCurrentContainer() { + MetricsContainer container = CONTAINER_FOR_THREAD.get(); + if (container == null && REPORTED_MISSING_CONTAINER.compareAndSet(false, true)) { + if (METRICS_SUPPORTED.get()) { + LOGGER.error( + "Unable to update metrics on the current thread. " + + "Most likely caused by using metrics outside the managed work-execution thread."); + } else { + LOGGER.warn("Reporting metrics are not supported in the current execution environment."); + } + } + return container; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java new file mode 100644 index 0000000..ec81251 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsFilter.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableSet; +import java.util.Set; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Simple POJO representing a filter for querying metrics. + */ +@Experimental(Kind.METRICS) +@AutoValue +public abstract class MetricsFilter { + + public Set<String> steps() { + return immutableSteps(); + } + + public Set<MetricNameFilter> names() { + return immutableNames(); + } + + protected abstract ImmutableSet<String> immutableSteps(); + protected abstract ImmutableSet<MetricNameFilter> immutableNames(); + + public static Builder builder() { + return new AutoValue_MetricsFilter.Builder(); + } + + /** + * Builder for creating a {@link MetricsFilter}. + */ + @AutoValue.Builder + public abstract static class Builder { + + protected abstract ImmutableSet.Builder<MetricNameFilter> immutableNamesBuilder(); + protected abstract ImmutableSet.Builder<String> immutableStepsBuilder(); + + /** + * Add a {@link MetricNameFilter}. + * + * <p>If no name filters are specified then all metric names will be inculded. + * + * + * <p>If one or more name filters are specified, then only metrics that match one or more of the + * filters will be included. + */ + public Builder addNameFilter(MetricNameFilter nameFilter) { + immutableNamesBuilder().add(nameFilter); + return this; + } + + /** + * Add a step filter. + * + * <p>If no steps are specified then metrics will be included for all steps. + * + * <p>If one or more steps are specified, then metrics will be included if they are part of + * any of the specified steps. + */ + public Builder addStep(String step) { + immutableStepsBuilder().add(step); + return this; + } + + public abstract MetricsFilter build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java new file mode 100644 index 0000000..5a02106 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsMap.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.metrics; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.Iterables; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A map from {@code K} to {@code T} that supports getting or creating values associated with a key + * in a thread-safe manner. + */ +@Experimental(Kind.METRICS) +public class MetricsMap<K, T> { + + /** Interface for creating instances to populate the {@link MetricsMap}. */ + public interface Factory<K, T> { + /** + * Create an instance of {@code T} to use with the given {@code key}. + * + * <p>It must be safe to call this from multiple threads. + */ + T createInstance(K key); + } + + private final Factory<K, T> factory; + private final ConcurrentMap<K, T> metrics = new ConcurrentHashMap<>(); + + public MetricsMap(Factory<K, T> factory) { + this.factory = factory; + } + + /** + * Get or create the value associated with the given key. + */ + public T get(K key) { + T metric = metrics.get(key); + if (metric == null) { + metric = factory.createInstance(key); + metric = MoreObjects.firstNonNull(metrics.putIfAbsent(key, metric), metric); + } + return metric; + } + + /** + * Get the value associated with the given key, if it exists. + */ + @Nullable + public T tryGet(K key) { + return metrics.get(key); + } + + /** + * Return an iterable over the entries in the current {@link MetricsMap}. + */ + public Iterable<Map.Entry<K, T>> entries() { + return Iterables.unmodifiableIterable(metrics.entrySet()); + } + + /** + * Return an iterable over the values in the current {@link MetricsMap}. + */ + public Iterable<T> values() { + return Iterables.unmodifiableIterable(metrics.values()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java new file mode 100644 index 0000000..f71dc7a --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Metrics allow exporting information about the execution of a pipeline. + * They are intended to be used for monitoring and understanding the + * execution. + * + * <p>Metrics may also be queried from the {@link org.apache.beam.sdk.PipelineResult} object. + * + * <p>Runners should look at {@link org.apache.beam.sdk.metrics.MetricsContainer} for details on + * how to support metrics. + */ +package org.apache.beam.sdk.metrics; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java new file mode 100644 index 0000000..408f145 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/CounterCellTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link CounterCell}. + */ +@RunWith(JUnit4.class) +public class CounterCellTest { + + private CounterCell cell = new CounterCell(); + + @Test + public void testDeltaAndCumulative() { + cell.inc(5); + cell.inc(7); + assertThat(cell.getCumulative(), equalTo(12L)); + assertThat("getCumulative is idempotent", cell.getCumulative(), equalTo(12L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + assertThat(cell.getCumulative(), equalTo(12L)); + + cell.inc(30); + assertThat(cell.getCumulative(), equalTo(42L)); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java new file mode 100644 index 0000000..d00f8cd --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DirtyStateTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DirtyStateTest}. + */ +@RunWith(JUnit4.class) +public class DirtyStateTest { + + private final DirtyState dirty = new DirtyState(); + + @Test + public void basicPath() { + assertThat("Should start dirty", dirty.beforeCommit(), is(true)); + dirty.afterCommit(); + assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); + + dirty.afterModification(); + assertThat("Should be dirty after change", dirty.beforeCommit(), is(true)); + dirty.afterCommit(); + assertThat("Should be clean after commit", dirty.beforeCommit(), is(false)); + } + + @Test + public void changeAfterBeforeCommit() { + assertThat("Should start dirty", dirty.beforeCommit(), is(true)); + dirty.afterModification(); + dirty.afterCommit(); + assertThat("Changes after beforeCommit should be dirty after afterCommit", + dirty.beforeCommit(), is(true)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java new file mode 100644 index 0000000..07e0b26 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/DistributionCellTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link DistributionCell}. + */ +@RunWith(JUnit4.class) +public class DistributionCellTest { + private DistributionCell cell = new DistributionCell(); + + @Test + public void testDeltaAndCumulative() { + cell.update(5); + cell.update(7); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); + assertThat("getCumulative is idempotent", + cell.getCumulative(), equalTo(DistributionData.create(12, 2, 5, 7))); + + assertThat(cell.getDirty().beforeCommit(), equalTo(true)); + cell.getDirty().afterCommit(); + assertThat(cell.getDirty().beforeCommit(), equalTo(false)); + + cell.update(30); + assertThat(cell.getCumulative(), equalTo(DistributionData.create(42, 3, 5, 30))); + + assertThat("Adding a new value made the cell dirty", + cell.getDirty().beforeCommit(), equalTo(true)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java new file mode 100644 index 0000000..bdcb94f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricMatchers.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import java.util.Objects; +import org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; + +/** + * Matchers for metrics. + */ +public class MetricMatchers { + + public static <T> Matcher<MetricUpdate<T>> metricUpdate(final String name, final T update) { + return new TypeSafeMatcher<MetricUpdate<T>>() { + @Override + protected boolean matchesSafely(MetricUpdate<T> item) { + return Objects.equals(name, item.getKey().metricName().name()) + && Objects.equals(update, item.getUpdate()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricUpdate{name=").appendValue(name) + .appendText(", update=").appendValue(update) + .appendText("}"); + } + }; + } + + public static <T> Matcher<MetricUpdate<T>> metricUpdate( + final String namespace, final String name, final String step, final T update) { + return new TypeSafeMatcher<MetricUpdate<T>>() { + @Override + protected boolean matchesSafely(MetricUpdate<T> item) { + return Objects.equals(namespace, item.getKey().metricName().namespace()) + && Objects.equals(name, item.getKey().metricName().name()) + && Objects.equals(step, item.getKey().stepName()) + && Objects.equals(update, item.getUpdate()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricUpdate{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(", update=").appendValue(update) + .appendText("}"); + } + }; + } + + public static <T> Matcher<MetricResult<T>> metricResult( + final String namespace, final String name, final String step, + final T logical, final T physical) { + return new TypeSafeMatcher<MetricResult<T>>() { + @Override + protected boolean matchesSafely(MetricResult<T> item) { + return Objects.equals(namespace, item.name().namespace()) + && Objects.equals(name, item.name().name()) + && Objects.equals(step, item.step()) + && Objects.equals(logical, item.committed()) + && Objects.equals(physical, item.attempted()); + } + + @Override + public void describeTo(Description description) { + description + .appendText("MetricResult{inNamespace=").appendValue(namespace) + .appendText(", name=").appendValue(name) + .appendText(", step=").appendValue(step) + .appendText(", logical=").appendValue(logical) + .appendText(", physical=").appendValue(physical) + .appendText("}"); + } + }; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java new file mode 100644 index 0000000..58797ce --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsContainerTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link MetricsContainer}. + */ +@RunWith(JUnit4.class) +public class MetricsContainerTest { + + @Test + public void testCounterDeltas() { + MetricsContainer container = new MetricsContainer("step1"); + CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); + CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); + assertThat("All counters should start out dirty", + container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 0L), + metricUpdate("name2", 0L))); + container.commitUpdates(); + assertThat("After commit no counters should be dirty", + container.getUpdates().counterUpdates(), emptyIterable()); + + c1.inc(5L); + c2.inc(4L); + + assertThat(container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + assertThat("Since we haven't committed, updates are still included", + container.getUpdates().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + container.commitUpdates(); + assertThat("After commit there are no updates", + container.getUpdates().counterUpdates(), emptyIterable()); + + c1.inc(8L); + assertThat(container.getUpdates().counterUpdates(), contains( + metricUpdate("name1", 13L))); + } + + @Test + public void testCounterCumulatives() { + MetricsContainer container = new MetricsContainer("step1"); + CounterCell c1 = container.getCounter(MetricName.named("ns", "name1")); + CounterCell c2 = container.getCounter(MetricName.named("ns", "name2")); + c1.inc(2L); + c2.inc(4L); + c1.inc(3L); + + container.getUpdates(); + container.commitUpdates(); + assertThat("Committing updates shouldn't affect cumulative counter values", + container.getCumulative().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 5L), + metricUpdate("name2", 4L))); + + c1.inc(8L); + assertThat(container.getCumulative().counterUpdates(), containsInAnyOrder( + metricUpdate("name1", 13L), + metricUpdate("name2", 4L))); + } + + @Test + public void testDistributionDeltas() { + MetricsContainer container = new MetricsContainer("step1"); + DistributionCell c1 = container.getDistribution(MetricName.named("ns", "name1")); + DistributionCell c2 = container.getDistribution(MetricName.named("ns", "name2")); + + assertThat("Initial update includes initial zero-values", + container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.EMPTY), + metricUpdate("name2", DistributionData.EMPTY))); + + container.commitUpdates(); + assertThat("No updates after commit", + container.getUpdates().distributionUpdates(), emptyIterable()); + + c1.update(5L); + c2.update(4L); + + assertThat(container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), + metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); + assertThat("Updates stay the same without commit", + container.getUpdates().distributionUpdates(), containsInAnyOrder( + metricUpdate("name1", DistributionData.create(5, 1, 5, 5)), + metricUpdate("name2", DistributionData.create(4, 1, 4, 4)))); + + container.commitUpdates(); + assertThat("No updatess after commit", + container.getUpdates().distributionUpdates(), emptyIterable()); + + c1.update(8L); + c1.update(4L); + assertThat(container.getUpdates().distributionUpdates(), contains( + metricUpdate("name1", DistributionData.create(17, 3, 4, 8)))); + container.commitUpdates(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8524ed95/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java new file mode 100644 index 0000000..4200a20 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsEnvironmentTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.metrics; + +import static org.apache.beam.sdk.metrics.MetricMatchers.metricUpdate; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; + +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link MetricsEnvironment}. + */ +@RunWith(JUnit4.class) +public class MetricsEnvironmentTest { + @After + public void teardown() { + MetricsEnvironment.unsetMetricsContainer(); + } + + @Test + public void testUsesAppropriateMetricsContainer() { + Counter counter = Metrics.counter("ns", "name"); + MetricsContainer c1 = new MetricsContainer("step1"); + MetricsContainer c2 = new MetricsContainer("step2"); + + MetricsEnvironment.setMetricsContainer(c1); + counter.inc(); + MetricsEnvironment.setMetricsContainer(c2); + counter.dec(); + MetricsEnvironment.unsetMetricsContainer(); + + MetricUpdates updates1 = c1.getUpdates(); + MetricUpdates updates2 = c2.getUpdates(); + assertThat(updates1.counterUpdates(), contains(metricUpdate("ns", "name", "step1", 1L))); + assertThat(updates2.counterUpdates(), contains(metricUpdate("ns", "name", "step2", -1L))); + } + + @Test + public void testBehavesWithoutMetricsContainer() { + assertNull(MetricsEnvironment.getCurrentContainer()); + } +}