Repository: incubator-beam
Updated Branches:
  refs/heads/master 204678323 -> f346c877a


Added support for reporting aggregator values to Spark sinks


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/226dea2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/226dea2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/226dea2f

Branch: refs/heads/master
Commit: 226dea2f04de2c000733f1182bdd3d18d516d4e4
Parents: 2046783
Author: staslev <stasle...@gmail.com>
Authored: Fri Aug 26 10:26:38 2016 +0300
Committer: Sela <ans...@paypal.com>
Committed: Fri Aug 26 13:01:48 2016 +0300

----------------------------------------------------------------------
 runners/spark/pom.xml                           |   6 +
 .../runners/spark/SparkPipelineOptions.java     |   7 +-
 .../spark/aggregators/NamedAggregators.java     |  38 ++++-
 .../aggregators/metrics/AggregatorMetric.java   |  44 +++++
 .../metrics/AggregatorMetricSource.java         |  49 ++++++
 .../metrics/WithNamedAggregatorsSupport.java    | 169 +++++++++++++++++++
 .../spark/aggregators/metrics/package-info.java |  22 +++
 .../spark/aggregators/metrics/sink/CsvSink.java |  39 +++++
 .../aggregators/metrics/sink/GraphiteSink.java  |  39 +++++
 .../aggregators/metrics/sink/package-info.java  |  23 +++
 .../apache/beam/runners/spark/io/ConsoleIO.java |   2 +-
 .../beam/runners/spark/io/hadoop/HadoopIO.java  |   4 +-
 .../spark/translation/SparkRuntimeContext.java  |  29 +++-
 .../runners/spark/util/BroadcastHelper.java     |   4 +-
 .../runners/spark/InMemoryMetricsSinkRule.java  |  32 ++++
 .../beam/runners/spark/SimpleWordCountTest.java |  12 ++
 .../metrics/sink/InMemoryMetrics.java           |  79 +++++++++
 .../spark/src/test/resources/metrics.properties |  29 ++++
 18 files changed, 611 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index b924cb8..b928b44 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -37,6 +37,7 @@
     <spark.version>1.6.2</spark.version>
     <hadoop.version>2.2.0</hadoop.version>
     <kafka.version>0.8.2.1</kafka.version>
+    <dropwizard.metrics.version>3.1.2</dropwizard.metrics.version>
   </properties>
 
   <profiles>
@@ -231,6 +232,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>${dropwizard.metrics.version}</version>
+    </dependency>
 
     <!-- test dependencies -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 080ff19..be4f7f0 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -38,10 +38,15 @@ public interface SparkPipelineOptions extends 
PipelineOptions, StreamingOptions,
           + "execution is stopped")
   @Default.Long(-1)
   Long getTimeout();
-  void setTimeout(Long batchInterval);
+  void setTimeout(Long timeoutMillis);
 
   @Description("Batch interval for Spark streaming in milliseconds.")
   @Default.Long(1000)
   Long getBatchIntervalMillis();
   void setBatchIntervalMillis(Long batchInterval);
+
+  @Description("Enable/disable sending aggregator values to Spark's metric 
sinks")
+  @Default.Boolean(true)
+  Boolean getEnableSparkSinks();
+  void setEnableSparkSinks(Boolean enableSparkSinks);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index e2cd963..4e96466 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -18,13 +18,18 @@
 
 package org.apache.beam.runners.spark.aggregators;
 
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.TreeMap;
+
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -70,6 +75,22 @@ public class NamedAggregators implements Serializable {
   }
 
   /**
+   * @return a map of all the aggregator names and their <b>rendered </b>values
+   */
+  public Map<String, ?> renderAll() {
+    return
+        ImmutableMap.copyOf(
+            Maps.transformValues(mNamedAggregators,
+                new Function<State<?, ?, ?>, Object>() {
+
+                  @Override
+                  public Object apply(State<?, ?, ?> state) {
+                    return state.render();
+                  }
+                }));
+  }
+
+  /**
    * Merges another NamedAggregators instance with this instance.
    *
    * @param other The other instance of named aggregators ot merge.
@@ -116,6 +137,7 @@ public class NamedAggregators implements Serializable {
    * @param <OutputT>   Output data type
    */
   public interface State<InputT, InterT, OutputT> extends Serializable {
+
     /**
      * @param element new element to update state
      */
@@ -133,16 +155,16 @@ public class NamedAggregators implements Serializable {
   /**
    * =&gt; combineFunction in data flow.
    */
-  public static class CombineFunctionState<InputT, InterT, OutpuT>
-      implements State<InputT, InterT, OutpuT> {
+  public static class CombineFunctionState<InputT, InterT, OutputT>
+      implements State<InputT, InterT, OutputT> {
 
-    private Combine.CombineFn<InputT, InterT, OutpuT> combineFn;
+    private Combine.CombineFn<InputT, InterT, OutputT> combineFn;
     private Coder<InputT> inCoder;
     private SparkRuntimeContext ctxt;
     private transient InterT state;
 
     public CombineFunctionState(
-        Combine.CombineFn<InputT, InterT, OutpuT> combineFn,
+        Combine.CombineFn<InputT, InterT, OutputT> combineFn,
         Coder<InputT> inCoder,
         SparkRuntimeContext ctxt) {
       this.combineFn = combineFn;
@@ -157,7 +179,7 @@ public class NamedAggregators implements Serializable {
     }
 
     @Override
-    public State<InputT, InterT, OutpuT> merge(State<InputT, InterT, OutpuT> 
other) {
+    public State<InputT, InterT, OutputT> merge(State<InputT, InterT, OutputT> 
other) {
       this.state = combineFn.mergeAccumulators(ImmutableList.of(current(), 
other.current()));
       return this;
     }
@@ -168,12 +190,12 @@ public class NamedAggregators implements Serializable {
     }
 
     @Override
-    public OutpuT render() {
+    public OutputT render() {
       return combineFn.extractOutput(state);
     }
 
     @Override
-    public Combine.CombineFn<InputT, InterT, OutpuT> getCombineFn() {
+    public Combine.CombineFn<InputT, InterT, OutputT> getCombineFn() {
       return combineFn;
     }
 
@@ -192,7 +214,7 @@ public class NamedAggregators implements Serializable {
     @SuppressWarnings("unchecked")
     private void readObject(ObjectInputStream ois) throws IOException, 
ClassNotFoundException {
       ctxt = (SparkRuntimeContext) ois.readObject();
-      combineFn = (Combine.CombineFn<InputT, InterT, OutpuT>) ois.readObject();
+      combineFn = (Combine.CombineFn<InputT, InterT, OutputT>) 
ois.readObject();
       inCoder = (Coder<InputT>) ois.readObject();
       try {
         state = combineFn.getAccumulatorCoder(ctxt.getCoderRegistry(), inCoder)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
new file mode 100644
index 0000000..c07a069
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetric.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics;
+
+import com.codahale.metrics.Metric;
+
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+
+/**
+ * An adapter between the {@link NamedAggregators} and codahale's {@link 
Metric}
+ * interface.
+ */
+public class AggregatorMetric implements Metric {
+
+  private final NamedAggregators namedAggregators;
+
+  private AggregatorMetric(final NamedAggregators namedAggregators) {
+    this.namedAggregators = namedAggregators;
+  }
+
+  public static AggregatorMetric of(final NamedAggregators namedAggregators) {
+    return new AggregatorMetric(namedAggregators);
+  }
+
+  NamedAggregators getNamedAggregators() {
+    return namedAggregators;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
new file mode 100644
index 0000000..0658e04
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/AggregatorMetricSource.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.spark.metrics.source.Source;
+
+/**
+ * A Spark {@link Source} that is tailored to expose an {@link 
AggregatorMetric},
+ * wrapping an underlying {@link NamedAggregators} instance.
+ */
+public class AggregatorMetricSource implements Source {
+
+  private static final String SOURCE_NAME = "NamedAggregators";
+
+  private final MetricRegistry metricRegistry = new MetricRegistry();
+
+  public AggregatorMetricSource(final NamedAggregators aggregators) {
+    metricRegistry.register(SOURCE_NAME, AggregatorMetric.of(aggregators));
+  }
+
+  @Override
+  public String sourceName() {
+    return SOURCE_NAME;
+  }
+
+  @Override
+  public MetricRegistry metricRegistry() {
+    return metricRegistry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
new file mode 100644
index 0000000..88e2211
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/WithNamedAggregatorsSupport.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Ordering;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link MetricRegistry} decorator-like* that supports {@link 
AggregatorMetric} by exposing
+ * the underlying * {@link 
org.apache.beam.runners.spark.aggregators.NamedAggregators}'
+ * aggregators as {@link Gauge}s.
+ * <p>
+ * *{@link MetricRegistry} is not an interface, so this is not a by-the-book 
decorator.
+ * That said, it delegates all metric related getters to the "decorated" 
instance.
+ * </p>
+ */
+public class WithNamedAggregatorsSupport extends MetricRegistry {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(WithNamedAggregatorsSupport.class);
+
+  private MetricRegistry internalMetricRegistry;
+
+  private WithNamedAggregatorsSupport(final MetricRegistry 
internalMetricRegistry) {
+    this.internalMetricRegistry = internalMetricRegistry;
+  }
+
+  public static WithNamedAggregatorsSupport forRegistry(final MetricRegistry 
metricRegistry) {
+    return new WithNamedAggregatorsSupport(metricRegistry);
+  }
+
+  @Override
+  public SortedMap<String, Timer> getTimers(final MetricFilter filter) {
+    return internalMetricRegistry.getTimers(filter);
+  }
+
+  @Override
+  public SortedMap<String, Meter> getMeters(final MetricFilter filter) {
+    return internalMetricRegistry.getMeters(filter);
+  }
+
+  @Override
+  public SortedMap<String, Histogram> getHistograms(final MetricFilter filter) 
{
+    return internalMetricRegistry.getHistograms(filter);
+  }
+
+  @Override
+  public SortedMap<String, Counter> getCounters(final MetricFilter filter) {
+    return internalMetricRegistry.getCounters(filter);
+  }
+
+  @Override
+  public SortedMap<String, Gauge> getGauges(final MetricFilter filter) {
+    return
+        new ImmutableSortedMap.Builder<String, Gauge>(
+            Ordering.from(String.CASE_INSENSITIVE_ORDER))
+            .putAll(internalMetricRegistry.getGauges(filter))
+            .putAll(extractGauges(internalMetricRegistry, filter))
+            .build();
+  }
+
+  private Map<String, Gauge> extractGauges(final MetricRegistry metricRegistry,
+                                           final MetricFilter filter) {
+
+    // find the AggregatorMetric metrics from within all currently registered 
metrics
+    final Optional<Map<String, Gauge>> gauges =
+        FluentIterable
+            .from(metricRegistry.getMetrics().entrySet())
+            .firstMatch(isAggregatorMetric())
+            .transform(toGauges());
+
+    return
+        gauges.isPresent()
+            ? Maps.filterEntries(gauges.get(), matches(filter))
+            : ImmutableMap.<String, Gauge>of();
+  }
+
+  private Function<Map.Entry<String, Metric>, Map<String, Gauge>> toGauges() {
+    return new Function<Map.Entry<String, Metric>, Map<String, Gauge>>() {
+      @Override
+      public Map<String, Gauge> apply(final Map.Entry<String, Metric> entry) {
+        final NamedAggregators agg = ((AggregatorMetric) 
entry.getValue()).getNamedAggregators();
+        final Map<String, Gauge> gaugeMap = 
Maps.transformEntries(agg.renderAll(), toGauge());
+        return Maps.filterValues(gaugeMap, Predicates.notNull());
+      }
+    };
+  }
+
+  private Maps.EntryTransformer<String, Object, Gauge> toGauge() {
+    return new Maps.EntryTransformer<String, Object, Gauge>() {
+
+      @Override
+      public Gauge transformEntry(final String name, final Object rawValue) {
+        return new Gauge<Double>() {
+
+          @Override
+          public Double getValue() {
+            // at the moment the metric's type is assumed to be
+            // compatible with Double. While far from perfect, it seems 
reasonable at
+            // this point in time
+            try {
+              return Double.parseDouble(rawValue.toString());
+            } catch (final Exception e) {
+              LOG.warn("Failed reporting metric with name [{}], of type [{}], 
since it could not be"
+                  + " converted to double", name, 
rawValue.getClass().getSimpleName(), e);
+              return null;
+            }
+          }
+        };
+      }
+    };
+  }
+
+  private Predicate<Map.Entry<String, Gauge>> matches(final MetricFilter 
filter) {
+    return new Predicate<Map.Entry<String, Gauge>>() {
+      @Override
+      public boolean apply(final Map.Entry<String, Gauge> entry) {
+        return filter.matches(entry.getKey(), entry.getValue());
+      }
+    };
+  }
+
+  private Predicate<Map.Entry<String, Metric>> isAggregatorMetric() {
+    return new Predicate<Map.Entry<String, Metric>>() {
+      @Override
+      public boolean apply(final Map.Entry<String, Metric> metricEntry) {
+        return (metricEntry.getValue() instanceof AggregatorMetric);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java
new file mode 100644
index 0000000..f19f635
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines classes for integrating with Spark's metrics mechanism (Sinks, 
Sources, etc.).
+ */
+package org.apache.beam.runners.spark.aggregators.metrics;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
new file mode 100644
index 0000000..af1601a
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/CsvSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.Properties;
+
+import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric;
+import 
org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
+import org.apache.spark.metrics.sink.Sink;
+
+/**
+ * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics
+ * to a CSV file.
+ */
+public class CsvSink extends org.apache.spark.metrics.sink.CsvSink {
+  public CsvSink(final Properties properties,
+                 final MetricRegistry metricRegistry,
+                 final org.apache.spark.SecurityManager securityMgr) {
+    super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), 
securityMgr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
new file mode 100644
index 0000000..7a45ef7
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/GraphiteSink.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.Properties;
+
+import org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric;
+import 
org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
+import org.apache.spark.metrics.sink.Sink;
+
+/**
+ * A Spark {@link Sink} that is tailored to report {@link AggregatorMetric} 
metrics
+ * to Graphite.
+ */
+public class GraphiteSink extends org.apache.spark.metrics.sink.GraphiteSink {
+  public GraphiteSink(final Properties properties,
+                      final MetricRegistry metricRegistry,
+                      final org.apache.spark.SecurityManager securityMgr) {
+    super(properties, WithNamedAggregatorsSupport.forRegistry(metricRegistry), 
securityMgr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
new file mode 100644
index 0000000..2e6dd0d
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/metrics/sink/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Spark sinks that support
+ * the {@link 
org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetric}.
+ */
+package org.apache.beam.runners.spark.aggregators.metrics.sink;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
index eefea77..b1c567c 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/ConsoleIO.java
@@ -47,7 +47,7 @@ public final class ConsoleIO {
 
     /**
      * {@link PTransform} writing {@link PCollection} on the console.
-     * @param <T>
+     * @param <T> the type of the elements in the {@link PCollection}
      */
     public static class Unbound<T> extends PTransform<PCollection<T>, PDone> {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
index 7b10610..70bec78 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/hadoop/HadoopIO.java
@@ -55,8 +55,8 @@ public final class HadoopIO {
 
     /**
      * A {@link PTransform} reading bounded collection of data from HDFS.
-     * @param <K>
-     * @param <V>
+     * @param <K> the type of the keys
+     * @param <V> the type of the values
      */
     public static class Bound<K, V> extends PTransform<PInput, 
PCollection<KV<K, V>>> {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
index 2634c65..4e4cd1a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkRuntimeContext.java
@@ -20,14 +20,19 @@ package org.apache.beam.runners.spark.translation;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+
 import com.google.common.collect.ImmutableList;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+
+import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AggAccumParam;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import 
org.apache.beam.runners.spark.aggregators.metrics.AggregatorMetricSource;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
@@ -41,7 +46,9 @@ import org.apache.beam.sdk.transforms.Min;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.spark.Accumulator;
+import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.metrics.MetricsSystem;
 
 
 /**
@@ -63,8 +70,9 @@ public class SparkRuntimeContext implements Serializable {
   private transient CoderRegistry coderRegistry;
 
   SparkRuntimeContext(JavaSparkContext jsc, Pipeline pipeline) {
-    this.accum = jsc.accumulator(new NamedAggregators(), new AggAccumParam());
-    this.serializedPipelineOptions = 
serializePipelineOptions(pipeline.getOptions());
+    final SparkPipelineOptions opts = 
pipeline.getOptions().as(SparkPipelineOptions.class);
+    accum = registerMetrics(jsc, opts);
+    serializedPipelineOptions = serializePipelineOptions(opts);
   }
 
   private static String serializePipelineOptions(PipelineOptions 
pipelineOptions) {
@@ -83,6 +91,23 @@ public class SparkRuntimeContext implements Serializable {
     }
   }
 
+  private Accumulator<NamedAggregators> registerMetrics(final JavaSparkContext 
jsc,
+                                                        final 
SparkPipelineOptions opts) {
+    final NamedAggregators initialValue = new NamedAggregators();
+    final Accumulator<NamedAggregators> accum = jsc.accumulator(initialValue, 
new AggAccumParam());
+
+    if (opts.getEnableSparkSinks()) {
+      final MetricsSystem metricsSystem = 
SparkEnv$.MODULE$.get().metricsSystem();
+      final AggregatorMetricSource aggregatorMetricSource =
+          new AggregatorMetricSource(initialValue);
+      // in case the context was not cleared
+      metricsSystem.removeSource(aggregatorMetricSource);
+      metricsSystem.registerSource(aggregatorMetricSource);
+    }
+
+    return accum;
+  }
+
   /**
    * Retrieves corresponding value of an aggregator.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
index 5f0c795..5c13b80 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/BroadcastHelper.java
@@ -58,7 +58,7 @@ public abstract class BroadcastHelper<T> implements 
Serializable {
    * A {@link BroadcastHelper} that relies on the underlying
    * Spark serialization (Kryo) to broadcast values. This is appropriate when
    * broadcasting very large values, since no copy of the object is made.
-   * @param <T>
+   * @param <T> the type of the value stored in the broadcast variable
    */
   static class DirectBroadcastHelper<T> extends BroadcastHelper<T> {
     private Broadcast<T> bcast;
@@ -86,7 +86,7 @@ public abstract class BroadcastHelper<T> implements 
Serializable {
    * A {@link BroadcastHelper} that uses a
    * {@link Coder} to encode values as byte arrays
    * before broadcasting.
-   * @param <T>
+   * @param <T> the type of the value stored in the broadcast variable
    */
   static class CodedBroadcastHelper<T> extends BroadcastHelper<T> {
     private Broadcast<byte[]> bcast;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
new file mode 100644
index 0000000..506dbbd
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/InMemoryMetricsSinkRule.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A rule that cleans the {@link InMemoryMetrics} after the tests has finished.
+ */
+class InMemoryMetricsSinkRule extends ExternalResource {
+  @Override
+  protected void before() throws Throwable {
+    InMemoryMetrics.clearAll();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index f644765..8b7762f 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -19,6 +19,8 @@
 package org.apache.beam.runners.spark;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableSet;
@@ -27,6 +29,7 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
+import org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics;
 import org.apache.beam.runners.spark.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -39,12 +42,17 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.io.FileUtils;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExternalResource;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Simple word count test.
  */
 public class SimpleWordCountTest {
+
+  @Rule
+  public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
+
   private static final String[] WORDS_ARRAY = {
       "hi there", "hi", "hi sue bob",
       "hi sue", "", "bob hi"};
@@ -54,6 +62,8 @@ public class SimpleWordCountTest {
 
   @Test
   public void testInMem() throws Exception {
+    assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));
+
     SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
     options.setRunner(SparkRunner.class);
     Pipeline p = Pipeline.create(options);
@@ -66,6 +76,8 @@ public class SimpleWordCountTest {
 
     EvaluationResult res = (EvaluationResult) p.run();
     res.close();
+
+    assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
   }
 
   @Rule

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
new file mode 100644
index 0000000..35e6717
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/InMemoryMetrics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.aggregators.metrics.sink;
+
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.MetricRegistry;
+
+import java.util.Properties;
+
+import 
org.apache.beam.runners.spark.aggregators.metrics.WithNamedAggregatorsSupport;
+import org.apache.spark.metrics.sink.Sink;
+
+/**
+ * An in-memory {@link Sink} implementation for tests.
+ */
+public class InMemoryMetrics implements Sink {
+
+  private static WithNamedAggregatorsSupport extendedMetricsRegistry;
+  private static MetricRegistry internalMetricRegistry;
+
+  public InMemoryMetrics(final Properties properties,
+                         final MetricRegistry metricRegistry,
+                         final org.apache.spark.SecurityManager securityMgr) {
+    extendedMetricsRegistry = 
WithNamedAggregatorsSupport.forRegistry(metricRegistry);
+    internalMetricRegistry = metricRegistry;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T> T valueOf(final String name) {
+    final T retVal;
+
+    if (extendedMetricsRegistry != null
+        && extendedMetricsRegistry.getGauges().containsKey(name)) {
+      retVal = (T) extendedMetricsRegistry.getGauges().get(name).getValue();
+    } else {
+      retVal = null;
+    }
+
+    return retVal;
+  }
+
+  public static void clearAll() {
+    if (internalMetricRegistry != null) {
+      internalMetricRegistry.removeMatching(MetricFilter.ALL);
+    }
+  }
+
+  @Override
+  public void start() {
+
+  }
+
+  @Override
+  public void stop() {
+
+  }
+
+  @Override
+  public void report() {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/226dea2f/runners/spark/src/test/resources/metrics.properties
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/resources/metrics.properties 
b/runners/spark/src/test/resources/metrics.properties
new file mode 100644
index 0000000..4aa01d2
--- /dev/null
+++ b/runners/spark/src/test/resources/metrics.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+*.sink.memory.class=org.apache.beam.runners.spark.aggregators.metrics.sink.InMemoryMetrics
+
+#*.sink.csv.class=org.apache.beam.runners.spark.aggregators.metrics.sink.CsvSink
+#*.sink.csv.directory=/tmp/spark-metrics
+#*.sink.csv.period=1
+#*.sink.graphite.unit=SECONDS
+
+#*.sink.graphite.class=org.apache.beam.runners.spark.aggregators.metrics.sink.GraphiteSink
+#*.sink.graphite.host=YOUR_HOST
+#*.sink.graphite.port=2003
+#*.sink.graphite.prefix=spark
+#*.sink.graphite.period=1
+#*.sink.graphite.unit=SECONDS


Reply via email to