Repository: beam
Updated Branches:
  refs/heads/master e5afbb27f -> c442ef81a


[BEAM-1304] Checking for nullity before trying to obtain an aggregator's value.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/882c654b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/882c654b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/882c654b

Branch: refs/heads/master
Commit: 882c654b1a8aefd2e4281d786448734731db7816
Parents: e5afbb2
Author: Stas Levin <stasle...@gmail.com>
Authored: Sun Feb 5 12:17:35 2017 +0200
Committer: Stas Levin <stasle...@gmail.com>
Committed: Sun Feb 5 15:51:18 2017 +0200

----------------------------------------------------------------------
 .../runners/spark/aggregators/NamedAggregators.java    |  6 ++++--
 .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 +++++++++++++
 2 files changed, 17 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index b5aec32..c876c07 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -68,10 +68,12 @@ public class NamedAggregators implements Serializable {
    * @param name      Name of aggregator to retrieve.
    * @param typeClass Type class to cast the value to.
    * @param <T>       Type to be returned.
-   * @return the value of the aggregator associated with the specified name
+   * @return the value of the aggregator associated with the specified name,
+   * or <code>null</code> if the specified aggregator could not be found.
    */
   public <T> T getValue(String name, Class<T> typeClass) {
-    return typeClass.cast(mNamedAggregators.get(name).render());
+    final State<?, ?, ?> state = mNamedAggregators.get(name);
+    return state != null ? typeClass.cast(state.render()) : null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index 3b5dd21..8646510 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -28,10 +28,13 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
+import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -95,4 +98,14 @@ public class NamedAggregatorsTest {
     assertThat(InMemoryMetrics.<Double>valueOf("emptyLines"), is(1d));
 
   }
+
+  @Test
+  public void testNonExistingAggregatorName() throws Exception {
+    final SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+    final Long valueOf =
+        SparkAggregators.valueOf(
+            "myMissingAggregator", Long.class, 
SparkContextFactory.getSparkContext(options));
+
+    assertThat(valueOf, is(nullValue()));
+  }
 }

Reply via email to