[2/2] beam git commit: This closes #3840

2017-09-13 Thread staslevin
This closes #3840


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50532f0a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50532f0a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50532f0a

Branch: refs/heads/master
Commit: 50532f0a92d7ce8dbbdc6c3179ab7b9efde6a746
Parents: 8d71ebf c3d4c5d
Author: Stas Levin 
Authored: Wed Sep 13 11:04:20 2017 +0300
Committer: Stas Levin 
Committed: Wed Sep 13 11:04:20 2017 +0300

--
 .../SparkGroupAlsoByWindowViaWindowSet.java | 82 +---
 .../spark/stateful/SparkTimerInternals.java | 15 
 2 files changed, 56 insertions(+), 41 deletions(-)
--




[1/2] beam git commit: [BEAM-2859] Fixed processing timers not being properly fired when watermark stays put by tweaking the way spark-runner was delivering timers to reduceFnRunner in SparkGroupAlsoB

2017-09-13 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 8d71ebf82 -> 50532f0a9


[BEAM-2859] Fixed processing timers not being properly fired when watermark 
stays put by tweaking the way spark-runner was delivering timers to 
reduceFnRunner in SparkGroupAlsoByWindowViaWindowSet


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c3d4c5d9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c3d4c5d9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c3d4c5d9

Branch: refs/heads/master
Commit: c3d4c5d98cc115dce7e03e64cd29713562ff62b3
Parents: 8d71ebf
Author: Stas Levin 
Authored: Tue Sep 12 10:34:45 2017 +0300
Committer: Stas Levin 
Committed: Wed Sep 13 11:04:08 2017 +0300

--
 .../SparkGroupAlsoByWindowViaWindowSet.java | 82 +---
 .../spark/stateful/SparkTimerInternals.java | 15 
 2 files changed, 56 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c3d4c5d9/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 2258f05..1fb8700 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -18,7 +18,9 @@
 package org.apache.beam.runners.spark.stateful;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
 import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Table;
 import java.io.Serializable;
@@ -51,6 +53,7 @@ import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -204,6 +207,32 @@ public class SparkGroupAlsoByWindowViaWindowSet implements 
Serializable {
 this.droppedDueToLateness = droppedDueToLateness;
   }
 
+  /**
+   * Retrieves the timers that are eligible for processing by {@link
+   * org.apache.beam.runners.core.ReduceFnRunner}.
+   *
+   * @return A collection of timers that are eligible for processing. For 
a {@link
+   * TimeDomain#EVENT_TIME} timer, this implies that the watermark has 
passed the timer's
+   * timestamp. For other TimeDomains (e.g., {@link
+   * TimeDomain#PROCESSING_TIME}), a timer is always considered 
eligible for processing (no
+   * restrictions).
+   */
+  private Collection 
filterTimersEligibleForProcessing(
+  final Collection timers, final Instant 
inputWatermark) {
+final Predicate eligibleForProcessing =
+new Predicate() {
+
+  @Override
+  public boolean apply(final TimerInternals.TimerData timer) {
+return !timer.getDomain().equals(TimeDomain.EVENT_TIME)
+|| inputWatermark.isAfter(timer.getTimestamp());
+  }
+};
+
+return 
FluentIterable.from(timers).filter(eligibleForProcessing).toSet();
+  }
+
+
   @Override
   protected Tuple2>>*/ List>>
   computeNext() {
@@ -268,16 +297,14 @@ public class SparkGroupAlsoByWindowViaWindowSet 
implements Serializable {
 
   LOG.trace(logPrefix + ": input elements: {}", elements);
 
-  /*
-  Incoming expired windows are filtered based on
-  timerInternals.currentInputWatermarkTime() and the configured 
allowed
-  lateness. Note that this is done prior to calling
-  timerInternals.advanceWatermark so essentially the 
inputWatermark is
-  the highWatermark of the previous batch and the lowWatermark of 
the
-  current batch.
-  The highWatermark of the current batch will only affect filtering
-  as of the next batch.
-   */
+  // Incoming expired windows are filtered based on
+  // timerInternals.currentInputWatermarkTime() and the configured 
allowed
+  // lateness. Note that this is done prior to calling
+   

[1/2] beam git commit: [BEAM-2825] Refactored SparkGroupAlsoByWindowViaWindowSet to improve readability.

2017-09-03 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master a481d5611 -> f7d4583bd


[BEAM-2825] Refactored SparkGroupAlsoByWindowViaWindowSet to improve 
readability.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8b99ba3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8b99ba3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8b99ba3

Branch: refs/heads/master
Commit: c8b99ba393c54da1a3ffbc61c2e5f2ae92b0b2bb
Parents: a481d56
Author: Stas Levin 
Authored: Wed Aug 30 12:01:32 2017 +0300
Committer: Stas Levin 
Committed: Sun Sep 3 15:40:25 2017 +0300

--
 .../SparkGroupAlsoByWindowViaWindowSet.java | 878 +++
 1 file changed, 498 insertions(+), 380 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/c8b99ba3/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index e6a55a6..2258f05 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -58,12 +58,12 @@ import 
org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext$;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.Time;
 import org.apache.spark.streaming.api.java.JavaDStream;
@@ -73,435 +73,553 @@ import 
org.apache.spark.streaming.dstream.PairDStreamFunctions;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Function1;
 import scala.Option;
 import scala.Tuple2;
 import scala.Tuple3;
+import scala.collection.Iterator;
 import scala.collection.Seq;
-import scala.reflect.ClassTag;
 import scala.runtime.AbstractFunction1;
 
 /**
- * An implementation of {@link GroupAlsoByWindow}
- * logic for grouping by windows and controlling trigger firings and pane 
accumulation.
+ * An implementation of {@link GroupAlsoByWindow} logic for grouping by 
windows and controlling
+ * trigger firings and pane accumulation.
  *
  * This implementation is a composite of Spark transformations revolving 
around state management
- * using Spark's
- * {@link PairDStreamFunctions#updateStateByKey(Function1, Partitioner, 
boolean, ClassTag)}
- * to update state with new data and timers.
+ * using Spark's {@link PairDStreamFunctions#updateStateByKey(scala.Function1,
+ * org.apache.spark.Partitioner, boolean, scala.reflect.ClassTag)} to update 
state with new data and
+ * timers.
  *
- * Using updateStateByKey allows to scan through the entire state visiting 
not just the
- * updated state (new values for key) but also check if timers are ready to 
fire.
- * Since updateStateByKey bounds the types of state and output to be the same,
- * a (state, output) tuple is used, filtering the state (and output if no 
firing)
- * in the following steps.
+ * Using updateStateByKey allows to scan through the entire state visiting 
not just the updated
+ * state (new values for key) but also check if timers are ready to fire. 
Since updateStateByKey
+ * bounds the types of state and output to be the same, a (state, output) 
tuple is used, filtering
+ * the state (and output if no firing) in the following steps.
  */
 public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
-  private static final Logger LOG = LoggerFactory.getLogger(
-  SparkGroupAlsoByWindowViaWindowSet.class);
-
-  /**
-   * A helper class that is essentially a {@link Serializable} {@link 
AbstractFunction1}.
-   */
-  private abstract static class SerializableFunction1
-  extends AbstractFunction1 implements Serializable {
-  }
+  private static final Logger LOG =
+  LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);
 
-  public static 
-  JavaDStream>> groupAlsoByWindow(
-  final 

[2/2] beam git commit: This closes #3793

2017-09-03 Thread staslevin
This closes #3793


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f7d4583b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f7d4583b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f7d4583b

Branch: refs/heads/master
Commit: f7d4583bdb3ef36734125d8aa7670ad924864f4c
Parents: a481d56 c8b99ba
Author: Stas Levin 
Authored: Sun Sep 3 15:40:35 2017 +0300
Committer: Stas Levin 
Committed: Sun Sep 3 15:40:35 2017 +0300

--
 .../SparkGroupAlsoByWindowViaWindowSet.java | 878 +++
 1 file changed, 498 insertions(+), 380 deletions(-)
--




[2/2] beam git commit: This closes #3749

2017-09-03 Thread staslevin
This closes #3749


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a481d561
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a481d561
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a481d561

Branch: refs/heads/master
Commit: a481d5611925f29224724bf0e79365db8f905853
Parents: 80aebd9 ffd08da
Author: Stas Levin 
Authored: Sun Sep 3 09:04:29 2017 +0300
Committer: Stas Levin 
Committed: Sun Sep 3 09:04:29 2017 +0300

--
 .../SparkGroupAlsoByWindowViaWindowSet.java | 15 ++--
 .../spark/translation/BoundedDataset.java   | 17 -
 .../beam/runners/spark/translation/Dataset.java |  3 +-
 .../spark/translation/EvaluationContext.java| 23 --
 .../spark/translation/SparkContextFactory.java  |  2 -
 .../translation/StorageLevelPTransform.java | 37 --
 .../spark/translation/TransformTranslator.java  | 53 +
 .../spark/translation/TranslationUtils.java | 78 
 .../streaming/StreamingTransformTranslator.java | 15 +++-
 .../translation/streaming/UnboundedDataset.java | 27 +--
 .../spark/translation/StorageLevelTest.java | 75 ---
 11 files changed, 166 insertions(+), 179 deletions(-)
--




[1/2] beam git commit: [BEAM-2669] Fixed Kryo serialization exception when dstream is cached (by using coders and moving to bytes before attempting to serialise an RDD as part of caching it).

2017-09-03 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 80aebd902 -> a481d5611


[BEAM-2669] Fixed Kryo serialization exception when dstream is cached (by using 
coders and moving to bytes before attempting to serialise an RDD as part of 
caching it).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffd08dae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffd08dae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffd08dae

Branch: refs/heads/master
Commit: ffd08dae0d1a6fcde438ae4e9c2a348eb2a5d493
Parents: 80aebd9
Author: ksalant 
Authored: Wed Aug 23 14:54:46 2017 +0300
Committer: Stas Levin 
Committed: Sun Sep 3 09:03:28 2017 +0300

--
 .../SparkGroupAlsoByWindowViaWindowSet.java | 15 ++--
 .../spark/translation/BoundedDataset.java   | 17 -
 .../beam/runners/spark/translation/Dataset.java |  3 +-
 .../spark/translation/EvaluationContext.java| 23 --
 .../spark/translation/SparkContextFactory.java  |  2 -
 .../translation/StorageLevelPTransform.java | 37 --
 .../spark/translation/TransformTranslator.java  | 53 +
 .../spark/translation/TranslationUtils.java | 78 
 .../streaming/StreamingTransformTranslator.java | 15 +++-
 .../translation/streaming/UnboundedDataset.java | 27 +--
 .../spark/translation/StorageLevelTest.java | 75 ---
 11 files changed, 166 insertions(+), 179 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/ffd08dae/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 52f7376..e6a55a6 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -96,7 +96,7 @@ import scala.runtime.AbstractFunction1;
  * a (state, output) tuple is used, filtering the state (and output if no 
firing)
  * in the following steps.
  */
-public class SparkGroupAlsoByWindowViaWindowSet {
+public class SparkGroupAlsoByWindowViaWindowSet implements Serializable {
   private static final Logger LOG = LoggerFactory.getLogger(
   SparkGroupAlsoByWindowViaWindowSet.class);
 
@@ -226,8 +226,6 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 final SystemReduceFn 
reduceFn =
 SystemReduceFn.buffering(
 ((FullWindowedValueCoder) wvCoder).getValueCoder());
-final OutputWindowedValueHolder outputHolder =
-new OutputWindowedValueHolder<>();
 // use in memory Aggregators since Spark Accumulators are not resilient
 // in stateful operators, once done with this partition.
 final MetricsContainerImpl cellProvider = new 
MetricsContainerImpl("cellProvider");
@@ -280,6 +278,9 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 SparkTimerInternals.deserializeTimers(serTimers, 
timerDataCoder));
   }
 
+  final OutputWindowedValueHolder outputHolder =
+  new OutputWindowedValueHolder<>();
+
   ReduceFnRunner 
reduceFnRunner =
   new ReduceFnRunner<>(
   key,
@@ -294,8 +295,6 @@ public class SparkGroupAlsoByWindowViaWindowSet {
   reduceFn,
   options.get());
 
-  outputHolder.clear(); // clear before potential use.
-
   if (!seq.isEmpty()) {
 // new input for key.
 try {
@@ -457,7 +456,7 @@ public class SparkGroupAlsoByWindowViaWindowSet {
 });
   }
 
-  private static class StateAndTimers {
+  private static class StateAndTimers implements Serializable {
 //Serializable state for internals (namespace to state tag to coded value).
 private final Table state;
 private final Collection serTimers;
@@ -494,10 +493,6 @@ public class SparkGroupAlsoByWindowViaWindowSet {
   return windowedValues;
 }
 
-private void clear() {
-  windowedValues.clear();
-}
-
 @Override
 public  void outputWindowedValue(
 TupleTag tag,


[2/2] beam git commit: This closes #3738

2017-08-24 Thread staslevin
This closes #3738


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5181e619
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5181e619
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5181e619

Branch: refs/heads/master
Commit: 5181e619f17e1f69fabe8d5bdfc7a3a6a2142cde
Parents: c4517d0 15472b2
Author: Stas Levin 
Authored: Thu Aug 24 09:43:16 2017 +0300
Committer: Stas Levin 
Committed: Thu Aug 24 09:43:16 2017 +0300

--
 .../apache/beam/runners/spark/SparkRunner.java  |   5 +-
 .../beam/runners/spark/io/CreateStream.java | 104 ---
 .../SparkGroupAlsoByWindowViaWindowSet.java | 158 +++---
 .../spark/stateful/SparkTimerInternals.java |   6 +
 .../streaming/StreamingTransformTranslator.java |  71 +++--
 .../streaming/WatermarkSyncedDStream.java   | 149 +
 .../spark/util/GlobalWatermarkHolder.java   | 302 +--
 .../runners/spark/SparkPipelineStateTest.java   |   4 +-
 .../translation/streaming/CreateStreamTest.java |  33 +-
 .../spark/src/test/resources/log4j.properties   |  11 +-
 10 files changed, 633 insertions(+), 210 deletions(-)
--




[1/2] beam git commit: [BEAM-2671] Implemented an InputDStream that syncs up with the watermark values, this should help with streaming tests in spark-runner.

2017-08-24 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master c4517d04c -> 5181e619f


[BEAM-2671] Implemented an InputDStream that syncs up with the watermark 
values, this should help with streaming tests in spark-runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/15472b28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/15472b28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/15472b28

Branch: refs/heads/master
Commit: 15472b28c649381b90a0405d80012aa8523d13c5
Parents: c4517d0
Author: Stas Levin 
Authored: Sun Aug 20 16:48:57 2017 +0300
Committer: Stas Levin 
Committed: Thu Aug 24 09:42:12 2017 +0300

--
 .../apache/beam/runners/spark/SparkRunner.java  |   5 +-
 .../beam/runners/spark/io/CreateStream.java | 104 ---
 .../SparkGroupAlsoByWindowViaWindowSet.java | 158 +++---
 .../spark/stateful/SparkTimerInternals.java |   6 +
 .../streaming/StreamingTransformTranslator.java |  71 +++--
 .../streaming/WatermarkSyncedDStream.java   | 149 +
 .../spark/util/GlobalWatermarkHolder.java   | 302 +--
 .../runners/spark/SparkPipelineStateTest.java   |   4 +-
 .../translation/streaming/CreateStreamTest.java |  33 +-
 .../spark/src/test/resources/log4j.properties   |  11 +-
 10 files changed, 633 insertions(+), 210 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 595521f..98ca1be 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -40,7 +40,7 @@ import 
org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
-import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
+import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarkAdvancingStreamingListener;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.io.Read;
@@ -171,7 +171,8 @@ public final class SparkRunner extends 
PipelineRunner {
   }
 
   // register Watermarks listener to broadcast the advanced WMs.
-  jssc.addStreamingListener(new JavaStreamingListenerWrapper(new 
WatermarksListener()));
+  jssc.addStreamingListener(
+  new JavaStreamingListenerWrapper(new 
WatermarkAdvancingStreamingListener()));
 
   // The reason we call initAccumulators here even though it is called in
   // SparkRunnerStreamingContextFactory is because the factory is not 
called when resuming

http://git-wip-us.apache.org/repos/asf/beam/blob/15472b28/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
index d485d25..4c73d95 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/CreateStream.java
@@ -41,34 +41,34 @@ import org.joda.time.Instant;
 /**
  * Create an input stream from Queue. For SparkRunner tests only.
  *
- * To properly compose a stream of micro-batches with their Watermarks, 
please keep in mind
- * that eventually there a two queues here - one for batches and another for 
Watermarks.
+ * To properly compose a stream of micro-batches with their Watermarks, 
please keep in mind that
+ * eventually there a two queues here - one for batches and another for 
Watermarks.
  *
- * While both queues advance according to Spark's batch-interval, there is 
a slight difference
- * in how data is pushed into the stream compared to the advancement of 
Watermarks since Watermarks
+ * While both queues advance according to Spark's batch-interval, there is 
a slight difference in
+ * how data is pushed into the stream compared to the advancement of 
Watermarks since Watermarks
  * advance onBatchCompleted hook call so if you'd want to set the watermark 
advance for a specific
- * batch it should be called before that batch.
- * Also keep in mind that being a 

[2/2] beam git commit: This closes #2854

2017-05-09 Thread staslevin
This closes #2854


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e10fbdaa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e10fbdaa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e10fbdaa

Branch: refs/heads/master
Commit: e10fbdaa2db480efb45074d59341ab5256a42d80
Parents: d96fd17 81b89ac
Author: Stas Levin 
Authored: Tue May 9 10:09:42 2017 +0300
Committer: Stas Levin 
Committed: Tue May 9 10:09:42 2017 +0300

--
 .../apache/beam/runners/spark/io/SourceRDD.java | 173 ---
 .../spark/io/ReaderToIteratorAdapterTest.java   | 145 
 2 files changed, 260 insertions(+), 58 deletions(-)
--




[2/2] beam git commit: This closes #2733

2017-05-01 Thread staslevin
This closes #2733


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/535761a7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/535761a7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/535761a7

Branch: refs/heads/master
Commit: 535761a7451c3e1dc557a58c35f2190e05c68aac
Parents: 254470e fcb61ae
Author: Stas Levin 
Authored: Mon May 1 15:32:19 2017 +0300
Committer: Stas Levin 
Committed: Mon May 1 15:32:19 2017 +0300

--
 .../beam/runners/spark/io/SourceDStream.java| 61 +---
 1 file changed, 40 insertions(+), 21 deletions(-)
--




[1/2] beam git commit: [BEAM-2074, BEAM-2073] Fixed SourceDStream's rate control usage.

2017-05-01 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 254470e62 -> 535761a74


[BEAM-2074,BEAM-2073] Fixed SourceDStream's rate control usage.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fcb61ae6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fcb61ae6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fcb61ae6

Branch: refs/heads/master
Commit: fcb61ae603ba61ed94bbbe75f4d5c8257eaa1c32
Parents: 254470e
Author: Stas Levin 
Authored: Wed Apr 26 11:46:31 2017 +0300
Committer: Stas Levin 
Committed: Mon May 1 14:13:43 2017 +0300

--
 .../beam/runners/spark/io/SourceDStream.java| 61 +---
 1 file changed, 40 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/fcb61ae6/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index d8f414a..20aca5f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -99,7 +99,7 @@ class SourceDStream
 this.initialParallelism = ssc().sparkContext().defaultParallelism();
 checkArgument(this.initialParallelism > 0, "Number of partitions must be 
greater than zero.");
 
-this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : 
rateControlledMaxRecords();
+this.boundMaxRecords = boundMaxRecords;
 
 try {
   this.numPartitions =
@@ -124,14 +124,34 @@ class SourceDStream
 
 
   private MicrobatchSource createMicrobatchSource() {
-return new MicrobatchSource<>(
-unboundedSource,
-boundReadDuration,
-initialParallelism,
-boundMaxRecords,
--1,
-id(),
-readerCacheInterval);
+return new MicrobatchSource<>(unboundedSource,
+  boundReadDuration,
+  initialParallelism,
+  computeReadMaxRecords(),
+  -1,
+  id(),
+  readerCacheInterval);
+  }
+
+  private long computeReadMaxRecords() {
+if (boundMaxRecords > 0) {
+  LOG.info("Max records per batch has been set to {}, as configured in the 
PipelineOptions.",
+   boundMaxRecords);
+  return boundMaxRecords;
+} else {
+  final scala.Option rateControlledMax = rateControlledMaxRecords();
+  if (rateControlledMax.isDefined()) {
+LOG.info("Max records per batch has been set to {}, as advised by the 
rate controller.",
+ rateControlledMax.get());
+return rateControlledMax.get();
+  } else {
+LOG.info("Max records per batch has not been limited by neither 
configuration "
+ + "nor the rate controller, and will remain unlimited for 
the current batch "
+ + "({}).",
+ Long.MAX_VALUE);
+return Long.MAX_VALUE;
+  }
+}
   }
 
   @Override
@@ -170,19 +190,18 @@ class SourceDStream
 
   // Bound by records.
 
-  private long rateControlledMaxRecords() {
-scala.Option rateControllerOption = rateController();
-if (rateControllerOption.isDefined()) {
-  long rateLimitPerSecond = rateControllerOption.get().getLatestRate();
-  if (rateLimitPerSecond > 0) {
-long totalRateLimit =
-rateLimitPerSecond * (ssc().graph().batchDuration().milliseconds() 
/ 1000);
-LOG.info("RateController set limit to {}", totalRateLimit);
-return totalRateLimit;
-  }
+  private scala.Option rateControlledMaxRecords() {
+final scala.Option rateControllerOption = rateController();
+final scala.Option rateLimitPerBatch;
+final long rateLimitPerSec;
+if (rateControllerOption.isDefined()
+&& ((rateLimitPerSec = rateControllerOption.get().getLatestRate()) > 
0)) {
+  final long batchDurationSec = 
ssc().graph().batchDuration().milliseconds() / 1000;
+  rateLimitPerBatch = scala.Option.apply(rateLimitPerSec * 
batchDurationSec);
+} else {
+  rateLimitPerBatch = scala.Option.empty();
 }
-LOG.info("RateController had nothing to report, default is 
Long.MAX_VALUE");
-return Long.MAX_VALUE;
+return rateLimitPerBatch;
   }
 
   private final 

[2/2] beam git commit: This closes #2698

2017-05-01 Thread staslevin
This closes #2698


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/254470e6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/254470e6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/254470e6

Branch: refs/heads/master
Commit: 254470e626edb6a013ba5cb2d3312dc3bfbdcb51
Parents: b414f8d 3b6f4f6
Author: Stas Levin 
Authored: Mon May 1 14:07:20 2017 +0300
Committer: Stas Levin 
Committed: Mon May 1 14:07:20 2017 +0300

--
 .../beam/runners/spark/io/MicrobatchSource.java | 188 +--
 .../beam/runners/spark/io/SourceDStream.java|  12 +-
 .../apache/beam/runners/spark/io/SourceRDD.java |  11 +-
 .../spark/stateful/StateSpecFunctions.java  |  30 +--
 4 files changed, 125 insertions(+), 116 deletions(-)
--




[2/2] beam git commit: This closes #2403

2017-04-05 Thread staslevin
This closes #2403


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/646cbdb7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/646cbdb7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/646cbdb7

Branch: refs/heads/master
Commit: 646cbdb7087fc68b7c2ff175ffbebac5f58688fd
Parents: 6df661b d96a95c
Author: Stas Levin 
Authored: Wed Apr 5 10:26:16 2017 +0300
Committer: Stas Levin 
Committed: Wed Apr 5 10:26:16 2017 +0300

--
 .../apache/beam/sdk/testing/TestPipeline.java   | 76 +---
 .../org/apache/beam/sdk/io/TFRecordIOTest.java  |  7 +-
 2 files changed, 57 insertions(+), 26 deletions(-)
--




[1/2] beam git commit: [BEAM-1777] In certain circumstances PipelineEnforcement shades pipeline's exceptions.

2017-04-05 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 6df661b0e -> 646cbdb70


[BEAM-1777] In certain circumstances PipelineEnforcement shades pipeline's 
exceptions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d96a95c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d96a95c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d96a95c5

Branch: refs/heads/master
Commit: d96a95c5ed561721dcc4cde16bbd3a3308e6f18e
Parents: 6df661b
Author: Stas Levin 
Authored: Sun Apr 2 14:09:43 2017 +0300
Committer: Stas Levin 
Committed: Wed Apr 5 10:26:08 2017 +0300

--
 .../apache/beam/sdk/testing/TestPipeline.java   | 76 +---
 .../org/apache/beam/sdk/io/TFRecordIOTest.java  |  7 +-
 2 files changed, 57 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/d96a95c5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 6a8335e..a4ab196 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -106,7 +106,7 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 
 protected final Pipeline pipeline;
 
-private boolean runInvoked;
+protected boolean runAttempted;
 
 private PipelineRunEnforcement(final Pipeline pipeline) {
   this.pipeline = pipeline;
@@ -116,12 +116,14 @@ public class TestPipeline extends Pipeline implements 
TestRule {
   enableAutoRunIfMissing = enable;
 }
 
-protected void afterPipelineExecution() {
-  runInvoked = true;
+protected void beforePipelineExecution() {
+  runAttempted = true;
 }
 
-protected void afterTestCompletion() {
-  if (!runInvoked && enableAutoRunIfMissing) {
+protected void afterPipelineExecution() {}
+
+protected void afterUserCodeFinished() {
+  if (!runAttempted && enableAutoRunIfMissing) {
 pipeline.run().waitUntilFinish();
   }
 }
@@ -174,27 +176,38 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 }
 
 private void verifyPipelineExecution() {
-  final List pipelineNodes = 
recordPipelineNodes(pipeline);
-  if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) {
-final boolean hasDanglingPAssert =
-FluentIterable.from(pipelineNodes)
-.filter(Predicates.not(Predicates.in(runVisitedNodes)))
-.anyMatch(isPAssertNode);
-if (hasDanglingPAssert) {
-  throw new AbandonedNodeException("The pipeline contains abandoned 
PAssert(s).");
-} else {
-  throw new AbandonedNodeException("The pipeline contains abandoned 
PTransform(s).");
-}
-  } else if (runVisitedNodes == null && !enableAutoRunIfMissing) {
-if (!isEmptyPipeline(pipeline)) {
+  if (!isEmptyPipeline(pipeline)) {
+if (!runAttempted && !enableAutoRunIfMissing) {
   throw new PipelineRunMissingException(
   "The pipeline has not been run (runner: "
   + pipeline.getOptions().getRunner().getSimpleName()
   + ")");
+
+} else {
+  final List pipelineNodes = 
recordPipelineNodes(pipeline);
+  if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) {
+final boolean hasDanglingPAssert =
+FluentIterable.from(pipelineNodes)
+  
.filter(Predicates.not(Predicates.in(runVisitedNodes)))
+  .anyMatch(isPAssertNode);
+if (hasDanglingPAssert) {
+  throw new AbandonedNodeException("The pipeline contains 
abandoned PAssert(s).");
+} else {
+  throw new AbandonedNodeException("The pipeline contains 
abandoned PTransform(s).");
+}
+  }
 }
   }
 }
 
+private boolean visitedAll(final List 
pipelineNodes) {
+  return runVisitedNodes.equals(pipelineNodes);
+}
+
+private boolean pipelineRunSucceeded() {
+  return runVisitedNodes != null;
+}
+
 @Override
 protected void afterPipelineExecution() {
   runVisitedNodes = recordPipelineNodes(pipeline);
@@ -202,8 +215,8 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 }
 
 @Override
-protected void afterTestCompletion() {
-  super.afterTestCompletion();
+protected void afterUserCodeFinished() {
+  super.afterUserCodeFinished();
   

[2/2] beam git commit: This closes #2073

2017-04-02 Thread staslevin
This closes #2073


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe1d4124
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe1d4124
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe1d4124

Branch: refs/heads/master
Commit: fe1d4124e7d51ec45869fb9e95cc8243f5891553
Parents: ea33e33 38e00f3
Author: Stas Levin 
Authored: Sun Apr 2 13:07:28 2017 +0300
Committer: Stas Levin 
Committed: Sun Apr 2 13:07:28 2017 +0300

--
 .../runners/spark/io/SparkUnboundedSource.java  | 36 ++--
 .../spark/stateful/StateSpecFunctions.java  | 21 +---
 2 files changed, 50 insertions(+), 7 deletions(-)
--




[1/2] beam git commit: [BEAM-1048] Added a read duration metric to SparkUnboundedSource.

2017-04-02 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master ea33e3373 -> fe1d4124e


[BEAM-1048] Added a read duration metric to SparkUnboundedSource.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/38e00f3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/38e00f3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/38e00f3f

Branch: refs/heads/master
Commit: 38e00f3f01e0125f6ac929da0a9d1c93fcc6787d
Parents: ea33e33
Author: Stas Levin 
Authored: Sun Apr 2 08:46:14 2017 +0300
Committer: Stas Levin 
Committed: Sun Apr 2 13:07:02 2017 +0300

--
 .../runners/spark/io/SparkUnboundedSource.java  | 36 ++--
 .../spark/stateful/StateSpecFunctions.java  | 21 +---
 2 files changed, 50 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/38e00f3f/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index 162bca4..6b34590 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -18,6 +18,8 @@
 
 package org.apache.beam.runners.spark.io;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -32,6 +34,10 @@ import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.metrics.Gauge;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.metrics.MetricsContainer;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -105,7 +111,8 @@ public class SparkUnboundedSource {
 JavaDStream metadataDStream = mapWithStateDStream.map(new 
Tuple2MetadataFunction());
 
 // register ReadReportDStream to report information related to this read.
-new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, 
id)).register();
+new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, 
id), stepName)
+.register();
 
 // output the actual (deserialized) stream.
 WindowedValue.FullWindowedValueCoder coder =
@@ -148,18 +155,25 @@ public class SparkUnboundedSource {
* Updates {@link MetricsAccumulator} with metrics reported in the 
read.
*/
   private static class ReadReportDStream extends DStream {
+
+private static final String READ_DURATION_MILLIS = "readDurationMillis";
+private static final String NAMESPACE = "spark-runner.io";
+
 private final DStream parent;
 private final int inputDStreamId;
 private final String sourceName;
+private final String stepName;
 
 ReadReportDStream(
 DStream parent,
 int inputDStreamId,
-String sourceName) {
+String sourceName,
+String stepName) {
   super(parent.ssc(), JavaSparkContext$.MODULE$.fakeClassTag());
   this.parent = parent;
   this.inputDStreamId = inputDStreamId;
   this.sourceName = sourceName;
+  this.stepName = stepName;
 }
 
 @Override
@@ -182,6 +196,7 @@ public class SparkUnboundedSource {
   SparkWatermarks sparkWatermark = null;
   Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
   Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
+  long maxReadDuration = 0;
   if (parentRDDOpt.isDefined()) {
 JavaRDD parentRDD = parentRDDOpt.get().toJavaRDD();
 for (Metadata metadata: parentRDD.collect()) {
@@ -196,6 +211,16 @@ public class SparkUnboundedSource {
   globalHighWatermarkForBatch.isBefore(partitionHighWatermark)
   ? partitionHighWatermark : globalHighWatermarkForBatch;
   // Update metrics reported in the read
+  final Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS);
+  final MetricsContainer container = 
metadata.getMetricsContainer().getContainer(stepName);
+  try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(container)) {
+final long readDurationMillis = metadata.getReadDurationMillis();

[1/2] beam git commit: Added a specific signature so that Kryo doesn't have to look for it using reflective exploration.

2017-03-30 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 2a40534e8 -> 22d368b40


Added a specific signature so that Kryo doesn't have to look for it using 
reflective exploration.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a02c600
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a02c600
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a02c600

Branch: refs/heads/master
Commit: 1a02c60032f5733a6b2bae8a5853864fea86c035
Parents: 2a40534
Author: Stas Levin 
Authored: Wed Mar 29 15:33:17 2017 +0300
Committer: Stas Levin 
Committed: Thu Mar 30 15:00:43 2017 +0300

--
 .../runners/spark/coders/StatelessJavaSerializer.java| 11 +++
 1 file changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/1a02c600/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
index b29cf0c..01b3b93 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
@@ -54,6 +54,17 @@ import java.io.ObjectStreamClass;
  * 
  */
 class StatelessJavaSerializer extends Serializer {
+
+  // Since Kryo uses reflection to sequentially look for constructor 
signatures, starting
+  // with this particular signature spares exploring further ones, which 
involves
+  // NoSuchMethodException(s) being thrown as part of the exploration process 
and may slow
+  // things down, see Kryo#newSerializer(), see https://goo.gl/Jn425G
+  public StatelessJavaSerializer(final Kryo ignore1, final Class ignore2) {}
+
+  public StatelessJavaSerializer() {
+this(null, null);
+  }
+
   @SuppressWarnings("unchecked")
   public void write(Kryo kryo, Output output, Object object) {
 try {



[2/2] beam git commit: This closes #2356

2017-03-30 Thread staslevin
This closes #2356


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2a40534e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2a40534e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2a40534e

Branch: refs/heads/master
Commit: 2a40534e80fb84d969ac16bd0d62618109ee04b4
Parents: 769398e 3876f83
Author: Stas Levin 
Authored: Thu Mar 30 14:53:57 2017 +0300
Committer: Stas Levin 
Committed: Thu Mar 30 14:53:57 2017 +0300

--
 .../runners/spark/io/SparkUnboundedSource.java  | 37 
 .../spark/stateful/StateSpecFunctions.java  | 17 +
 2 files changed, 31 insertions(+), 23 deletions(-)
--




[1/2] beam git commit: Extracted captures to static classes to prevent them from capturing the scope.

2017-03-30 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 769398e40 -> 2a40534e8


Extracted captures to static classes to prevent them from capturing the scope.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3876f83a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3876f83a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3876f83a

Branch: refs/heads/master
Commit: 3876f83a82845e3c0a41152cf7a7c58378d994e7
Parents: 769398e
Author: Stas Levin 
Authored: Wed Mar 29 15:29:20 2017 +0300
Committer: Stas Levin 
Committed: Thu Mar 30 14:53:38 2017 +0300

--
 .../runners/spark/io/SparkUnboundedSource.java  | 37 
 .../spark/stateful/StateSpecFunctions.java  | 17 +
 2 files changed, 31 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
index a538907..162bca4 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java
@@ -102,13 +102,7 @@ public class SparkUnboundedSource {
 
 // report the number of input elements for this InputDStream to the 
InputInfoTracker.
 int id = inputDStream.inputDStream().id();
-JavaDStream metadataDStream = mapWithStateDStream.map(
-new Function, Metadata>, Metadata>() {
-  @Override
-  public Metadata call(Tuple2, Metadata> t2) throws 
Exception {
-return t2._2();
-  }
-});
+JavaDStream metadataDStream = mapWithStateDStream.map(new 
Tuple2MetadataFunction());
 
 // register ReadReportDStream to report information related to this read.
 new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, 
id)).register();
@@ -118,13 +112,10 @@ public class SparkUnboundedSource {
 WindowedValue.FullWindowedValueCoder.of(
 source.getDefaultOutputCoder(),
 GlobalWindow.Coder.INSTANCE);
-JavaDStream readUnboundedStream = 
mapWithStateDStream.flatMap(
-new FlatMapFunction, Metadata>, byte[]>() {
-  @Override
-  public Iterable call(Tuple2, Metadata> t2) 
throws Exception {
-return t2._1();
-  }
-}).map(CoderHelpers.fromByteFunction(coder));
+JavaDStream readUnboundedStream =
+mapWithStateDStream
+.flatMap(new Tuple2byteFlatMapFunction())
+.map(CoderHelpers.fromByteFunction(coder));
 return new UnboundedDataset<>(readUnboundedStream, 
Collections.singletonList(id));
   }
 
@@ -274,4 +265,22 @@ public class SparkUnboundedSource {
   return metricsContainer;
 }
   }
+
+  private static class Tuple2MetadataFunction
+  implements Function, Metadata>, Metadata> {
+
+@Override
+public Metadata call(Tuple2, Metadata> t2) throws 
Exception {
+  return t2._2();
+}
+  }
+
+  private static class Tuple2byteFlatMapFunction
+  implements FlatMapFunction, Metadata>, byte[]> {
+
+@Override
+public Iterable call(Tuple2, Metadata> t2) throws 
Exception {
+  return t2._1();
+}
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3876f83a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index ec4fce3..803fe45 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.spark.stateful;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import 

[1/2] beam git commit: [BEAM-1629] Init metrics/aggregators accumulators before traversing pipeline

2017-03-12 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master d16715309 -> b6ca062fc


[BEAM-1629] Init metrics/aggregators accumulators before traversing pipeline


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/874c8d0d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/874c8d0d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/874c8d0d

Branch: refs/heads/master
Commit: 874c8d0da65568b01cd5f184e303d39c7810a8bf
Parents: d167153
Author: Aviem Zur 
Authored: Mon Mar 6 20:48:48 2017 +0200
Committer: Stas Levin 
Committed: Sun Mar 12 10:02:23 2017 +0200

--
 .../spark/SparkNativePipelineVisitor.java   |  4 --
 .../beam/runners/spark/SparkPipelineResult.java |  8 +--
 .../apache/beam/runners/spark/SparkRunner.java  | 65 ++--
 .../beam/runners/spark/SparkRunnerDebugger.java | 30 ++---
 .../beam/runners/spark/TestSparkRunner.java |  4 +-
 .../aggregators/AggregatorsAccumulator.java | 44 +
 .../spark/aggregators/SparkAggregators.java | 40 ++--
 .../spark/metrics/AggregatorMetricSource.java   | 11 ++--
 .../spark/metrics/MetricsAccumulator.java   | 38 
 .../spark/metrics/SparkBeamMetricSource.java| 11 ++--
 .../spark/metrics/SparkMetricsContainer.java| 17 ++---
 .../spark/translation/TransformTranslator.java  | 13 ++--
 .../SparkRunnerStreamingContextFactory.java |  3 +
 .../streaming/StreamingTransformTranslator.java | 10 +--
 .../metrics/sink/NamedAggregatorsTest.java  | 15 +
 .../ResumeFromCheckpointStreamingTest.java  |  4 +-
 16 files changed, 156 insertions(+), 161 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
index 056da97..c2784a2 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java
@@ -19,7 +19,6 @@
 package org.apache.beam.runners.spark;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -27,11 +26,9 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
 import org.apache.beam.runners.spark.translation.TransformEvaluator;
-import org.apache.beam.runners.spark.translation.streaming.Checkpoint;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -55,7 +52,6 @@ public class SparkNativePipelineVisitor extends 
SparkRunner.Evaluator {
   SparkNativePipelineVisitor(SparkPipelineTranslator translator, 
EvaluationContext ctxt) {
 super(translator, ctxt);
 this.transforms = new ArrayList<>();
-MetricsAccumulator.init(ctxt.getSparkContext(), 
Optional.absent());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/874c8d0d/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
index ddc1964..ed1e0c8 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeoutException;
 import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.metrics.SparkMetricResults;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
-import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
@@ -84,13 +83,12 @@ public abstract class SparkPipelineResult implements 
PipelineResult {
   throws TimeoutException, ExecutionException, InterruptedException;
 
   

[2/2] beam git commit: This closes #2171

2017-03-12 Thread staslevin
This closes #2171


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b6ca062f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b6ca062f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b6ca062f

Branch: refs/heads/master
Commit: b6ca062fcfa31884baf08b804d04c12dee10b62e
Parents: d167153 874c8d0
Author: Stas Levin 
Authored: Sun Mar 12 10:02:30 2017 +0200
Committer: Stas Levin 
Committed: Sun Mar 12 10:02:30 2017 +0200

--
 .../spark/SparkNativePipelineVisitor.java   |  4 --
 .../beam/runners/spark/SparkPipelineResult.java |  8 +--
 .../apache/beam/runners/spark/SparkRunner.java  | 65 ++--
 .../beam/runners/spark/SparkRunnerDebugger.java | 30 ++---
 .../beam/runners/spark/TestSparkRunner.java |  4 +-
 .../aggregators/AggregatorsAccumulator.java | 44 +
 .../spark/aggregators/SparkAggregators.java | 40 ++--
 .../spark/metrics/AggregatorMetricSource.java   | 11 ++--
 .../spark/metrics/MetricsAccumulator.java   | 38 
 .../spark/metrics/SparkBeamMetricSource.java| 11 ++--
 .../spark/metrics/SparkMetricsContainer.java| 17 ++---
 .../spark/translation/TransformTranslator.java  | 13 ++--
 .../SparkRunnerStreamingContextFactory.java |  3 +
 .../streaming/StreamingTransformTranslator.java | 10 +--
 .../metrics/sink/NamedAggregatorsTest.java  | 15 +
 .../ResumeFromCheckpointStreamingTest.java  |  4 +-
 16 files changed, 156 insertions(+), 161 deletions(-)
--




[2/2] beam git commit: This closes #2161

2017-03-05 Thread staslevin
This closes #2161


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/59451bca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/59451bca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/59451bca

Branch: refs/heads/master
Commit: 59451bca60ef45b1daeb14b8194bac0f8bdcc98b
Parents: 11a381b 4febd95
Author: Stas Levin 
Authored: Sun Mar 5 17:51:59 2017 +0200
Committer: Stas Levin 
Committed: Sun Mar 5 17:51:59 2017 +0200

--
 .../apache/beam/runners/spark/translation/BoundedDataset.java  | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)
--




[1/2] beam git commit: [BEAM-1625] BoundedDataset action() does not materialize RDD

2017-03-05 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 11a381b23 -> 59451bca6


[BEAM-1625] BoundedDataset action() does not materialize RDD


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4febd954
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4febd954
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4febd954

Branch: refs/heads/master
Commit: 4febd954af00458032efbee45b7f9724fe0ea9ed
Parents: 11a381b
Author: Aviem Zur 
Authored: Sun Mar 5 16:17:35 2017 +0200
Committer: Aviem Zur 
Committed: Sun Mar 5 16:17:35 2017 +0200

--
 .../apache/beam/runners/spark/translation/BoundedDataset.java  | 6 ++
 1 file changed, 2 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/4febd954/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 5e19846..7db04a8 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.spark.translation;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-import java.util.Iterator;
 import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
@@ -107,9 +106,8 @@ public class BoundedDataset implements Dataset {
   @Override
   public void action() {
 // Empty function to force computation of RDD.
-rdd.foreachPartition(new VoidFunction>() {
-  @Override
-  public void call(Iterator windowedValueIterator) 
throws Exception {
+rdd.foreach(new VoidFunction() {
+  @Override public void call(WindowedValue tWindowedValue) throws 
Exception {
 // Empty implementation.
   }
 });



[2/2] beam git commit: This closes #2083

2017-02-27 Thread staslevin
This closes #2083


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3082178b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3082178b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3082178b

Branch: refs/heads/master
Commit: 3082178b3a35143573caa2b3d9afbf2babb5b2b5
Parents: 4bba380 b74f8fa
Author: Stas Levin 
Authored: Mon Feb 27 14:16:55 2017 +0200
Committer: Stas Levin 
Committed: Mon Feb 27 14:16:55 2017 +0200

--
 .../runners/spark/metrics/SparkBeamMetric.java  | 13 -
 .../spark/metrics/SparkBeamMetricTest.java  | 60 
 2 files changed, 70 insertions(+), 3 deletions(-)
--




[1/2] beam git commit: Remove periods from step name in SparkBeamMetric to avoid inconsistent hierarchies in Graphite metric sink

2017-02-27 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 4bba380cb -> 3082178b3


Remove periods from step name in SparkBeamMetric to avoid inconsistent 
hierarchies in Graphite metric sink


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b74f8fa3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b74f8fa3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b74f8fa3

Branch: refs/heads/master
Commit: b74f8fa30aea130f990c152a1c91a509c3b6d6c9
Parents: 4bba380
Author: Aviem Zur 
Authored: Thu Feb 23 07:09:55 2017 +0200
Committer: Stas Levin 
Committed: Mon Feb 27 14:15:43 2017 +0200

--
 .../runners/spark/metrics/SparkBeamMetric.java  | 13 -
 .../spark/metrics/SparkBeamMetricTest.java  | 60 
 2 files changed, 70 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/b74f8fa3/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
index 8e31b22..8328a1a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkBeamMetric.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark.metrics;
 
 import com.codahale.metrics.Metric;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.sdk.metrics.DistributionResult;
@@ -33,6 +34,7 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
  */
 class SparkBeamMetric implements Metric {
   private static final String ILLEGAL_CHARACTERS = "[^A-Za-z0-9\\._-]";
+  private static final String ILLEGAL_CHARACTERS_AND_PERIOD = "[^A-Za-z0-9_-]";
 
   private final SparkMetricResults metricResults = new SparkMetricResults();
 
@@ -54,9 +56,14 @@ class SparkBeamMetric implements Metric {
 return metrics;
   }
 
-  private String renderName(MetricResult metricResult) {
+  @VisibleForTesting
+  String renderName(MetricResult metricResult) {
+String renderedStepName = 
metricResult.step().replaceAll(ILLEGAL_CHARACTERS_AND_PERIOD, "_");
+if (renderedStepName.endsWith("_")) {
+  renderedStepName = renderedStepName.substring(0, 
renderedStepName.length() - 1);
+}
 MetricName metricName = metricResult.name();
-String rendered = metricResult.step() + "." + metricName.namespace() + "." 
+ metricName.name();
-return rendered.replaceAll(ILLEGAL_CHARACTERS, "_");
+return (renderedStepName + "." + metricName.namespace() + "." + 
metricName.name())
+.replaceAll(ILLEGAL_CHARACTERS, "_");
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b74f8fa3/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java
new file mode 100644
index 000..9426b2c
--- /dev/null
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/metrics/SparkBeamMetricTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.metrics;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.junit.Test;
+
+
+/**
+ * Test SparkBeamMetric.
+ */
+public class SparkBeamMetricTest {
+  @Test
+  public void testRenderName() throws Exception {
+MetricResult metricResult = new MetricResult() {
+  @Override
+  public 

[2/2] beam git commit: This closes #2089

2017-02-26 Thread staslevin
This closes #2089


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4bba380c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4bba380c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4bba380c

Branch: refs/heads/master
Commit: 4bba380cb011e38bd78c1ec899a6672a6b02fd05
Parents: 38a01e9 acc1013
Author: Stas Levin 
Authored: Sun Feb 26 17:28:26 2017 +0200
Committer: Stas Levin 
Committed: Sun Feb 26 17:28:26 2017 +0200

--
 .../apache/beam/sdk/testing/TestPipeline.java   | 42 +++-
 1 file changed, 32 insertions(+), 10 deletions(-)
--




[2/3] beam-site git commit: Regenerate website

2017-02-24 Thread staslevin
Regenerate website


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/1a607ad8
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/1a607ad8
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/1a607ad8

Branch: refs/heads/asf-site
Commit: 1a607ad8ea7667addbfe27e097a1d4ca912bf15a
Parents: 3c0c532
Author: Stas Levin 
Authored: Fri Feb 24 17:36:38 2017 +0200
Committer: Stas Levin 
Committed: Fri Feb 24 17:36:38 2017 +0200

--
 content/contribute/testing/index.html | 157 +
 1 file changed, 157 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/beam-site/blob/1a607ad8/content/contribute/testing/index.html
--
diff --git a/content/contribute/testing/index.html 
b/content/contribute/testing/index.html
index 92a0e1f..9b41686 100644
--- a/content/contribute/testing/index.html
+++ b/content/contribute/testing/index.html
@@ -173,6 +173,8 @@
   Testing 
Systems
   E2E Testing Framework
   RunnableOnService Tests
+  Effective use of 
the TestPipeline JUnit rule
+  API Surface testing
 
   
 
@@ -542,6 +544,161 @@ which enables test authors to write simple functionality 
verification. They are
 meant to use some of the built-in utilities of the SDK, namely PAssert, to
 verify that the simple pipelines they run end in the correct state.
 
+Effective use of the 
TestPipeline JUnit rule
+
+TestPipeline is JUnit rule designed 
to facilitate testing pipelines. 
+In combination with PAssert, the two 
can be used for testing and 
+writing assertions over pipelines. However, in order for these assertions 
+to be effective, the constructed pipeline must be run by a 
pipeline 
+runner. If the pipeline is not run (i.e., executed) then the 
+constructed PAssert statements will not 
be triggered, and will thus 
+be ineffective.
+
+To prevent such cases, TestPipeline 
has some protection mechanisms in place.
+
+Abandoned node detection (performed automatically)
+
+Abandoned nodes are PTransforms, 
PAsserts included, that were not 
+executed by the pipeline runner. Abandoned nodes are most likely to occur 
+due to the one of the following scenarios:
+
+  Lack of a pipeline.run() 
statement at the end of a test.
+  Addition of PTransforms  after 
the pipeline has already run.
+
+
+Abandoned node detection is automatically enabled when a real 
pipeline 
+runner (i.e. not a CrashingRunner) 
and/or a 
+@NeedsRunner / @RunnableOnService annotation are detected.
+
+Consider the following test:
+
+// Note the @Rule annotation 
here
+@Rule
+public final transient TestPipeline pipeline = TestPipeline.create();
+
+@Test
+@Category(NeedsRunner.class)
+public void myPipelineTest() throws Exception {
+
+final PCollectionString 
pCollection = 
+  pipeline
+.apply("Create", Create.of(WORDS).withCoder(StringUtf8Coder.of()))
+.apply(
+"Map1",
+MapElements.via(
+new SimpleFunctionString, String() {
+
+  @Override
+  public String 
apply(final String input) {
+return WHATEVER;
+  }
+}));
+
+PAssert.that(pCollection).containsInAnyOrder(WHATEVER);   
+
+/* ERROR: pipeline.run() is missing, PAsserts are ineffective 
*/
+}
+
+
+
+# Unsupported in Beam's Python SDK.
+
+
+
+The PAssert at the end of this test 
method will not be executed, since 
+pipeline is never run, making this test 
ineffective. If this test method 
+is run using an actual pipeline runner, an exception will be thrown 
+indicating that there was no run() 
invocation in the test.
+
+Exceptions that are thrown prior to executing a pipeline, will fail 
+the test unless handled by an ExpectedException rule.
+
+Consider the following test:
+
+// Note the @Rule annotation 
here
+@Rule
+public final transient TestPipeline pipeline = TestPipeline.create();
+
+@Test
+public void testReadingFailsTableDoesNotExist() 
throws Exception {
+  final String table = "TEST-TABLE";
+
+  BigtableIO.Read read =
+  BigtableIO.read()
+  .withBigtableOptions(BIGTABLE_OPTIONS)
+  .withTableId(table)
+  .withBigtableService(service);
+
+  // Exception will be thrown by read.validate() when read is 
applied.
+  thrown.expect(IllegalArgumentException.class);
+  thrown.expectMessage(String.format("Table %s does 
not exist", table));
+
+  p.apply(read);
+}
+
+
+
+# Unsupported in Beam's Python SDK.
+
+
+
+The application of the read 
transform throws an exception, which is then 
+handled by the thrown ExpectedException rule. 
+In light of this exception, the fact this test has abandoned nodes 
+(the read transform) does not play a 
role since the test fails before 
+the pipeline would 

[3/3] beam-site git commit: This closes #162

2017-02-24 Thread staslevin
This closes #162


Project: http://git-wip-us.apache.org/repos/asf/beam-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam-site/commit/408f290e
Tree: http://git-wip-us.apache.org/repos/asf/beam-site/tree/408f290e
Diff: http://git-wip-us.apache.org/repos/asf/beam-site/diff/408f290e

Branch: refs/heads/asf-site
Commit: 408f290ebe6270e8f80ef4e73bdb47931a4f7a74
Parents: 3fd0ed6 1a607ad
Author: Stas Levin 
Authored: Fri Feb 24 17:38:28 2017 +0200
Committer: Stas Levin 
Committed: Fri Feb 24 17:38:28 2017 +0200

--
 content/contribute/testing/index.html | 157 +
 src/contribute/testing.md | 155 
 2 files changed, 312 insertions(+)
--




[2/2] beam git commit: This closes #1706

2017-02-16 Thread staslevin
This closes #1706


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f03f6ac1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f03f6ac1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f03f6ac1

Branch: refs/heads/master
Commit: f03f6ac192dcc5f1ba92d9639b153a3afba3340d
Parents: 5fe7844 50daea2
Author: Stas Levin 
Authored: Thu Feb 16 11:18:24 2017 +0200
Committer: Stas Levin 
Committed: Thu Feb 16 11:18:24 2017 +0200

--
 .../apache/beam/sdk/testing/Annotations.java|  72 +++
 .../apache/beam/sdk/testing/TestPipeline.java   |  66 ++-
 .../apache/beam/sdk/metrics/MetricsTest.java|   6 +-
 .../beam/sdk/testing/TestPipelineTest.java  | 504 +++
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java |   9 +-
 5 files changed, 417 insertions(+), 240 deletions(-)
--




[1/2] beam git commit: [BEAM-1205] Auto set "enableAbandonedNodeEnforcement" in TestPipeline

2017-02-16 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 5fe78440b -> f03f6ac19


[BEAM-1205] Auto set "enableAbandonedNodeEnforcement" in TestPipeline


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50daea28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50daea28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50daea28

Branch: refs/heads/master
Commit: 50daea288b9c5df2b481e5e6bea153796c03830a
Parents: 5fe7844
Author: Stas Levin 
Authored: Thu Dec 22 19:13:01 2016 +0200
Committer: Stas Levin 
Committed: Thu Feb 16 11:18:09 2017 +0200

--
 .../apache/beam/sdk/testing/Annotations.java|  72 +++
 .../apache/beam/sdk/testing/TestPipeline.java   |  66 ++-
 .../apache/beam/sdk/metrics/MetricsTest.java|   6 +-
 .../beam/sdk/testing/TestPipelineTest.java  | 504 +++
 .../org/apache/beam/sdk/io/mqtt/MqttIOTest.java |   9 +-
 5 files changed, 417 insertions(+), 240 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java
new file mode 100644
index 000..e560226
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/Annotations.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.beam.sdk.testing;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import java.lang.annotation.Annotation;
+import java.util.Arrays;
+import javax.annotation.Nonnull;
+import org.junit.experimental.categories.Category;
+
+/**
+ * A utility class for querying annotations.
+ */
+class Annotations {
+
+  /**
+   * Annotation predicates.
+   */
+  static class Predicates {
+
+static Predicate isAnnotationOfType(final Class clazz) {
+  return new Predicate() {
+
+@Override
+public boolean apply(@Nonnull final Annotation annotation) {
+  return annotation.annotationType() != null
+  && annotation.annotationType().equals(clazz);
+}
+  };
+}
+
+static Predicate isCategoryOf(final Class value, final 
boolean allowDerived) {
+  return new Predicate() {
+
+@Override
+public boolean apply(@Nonnull final Annotation category) {
+  return
+  FluentIterable
+  .from(Arrays.asList(((Category) category).value()))
+  .anyMatch(new Predicate() {
+
+@Override
+public boolean apply(final Class aClass) {
+  return
+  allowDerived
+  ? value.isAssignableFrom(aClass)
+  : value.equals(aClass);
+}
+  });
+}
+  };
+}
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/50daea28/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 37c809a..02eefa9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.TreeNode;
 import com.fasterxml.jackson.databind.JsonNode;
@@ -93,15 +95,18 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 
   private static class 

[2/2] beam git commit: This closes #1991

2017-02-13 Thread staslevin
This closes #1991


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4db4fb2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4db4fb2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4db4fb2

Branch: refs/heads/master
Commit: d4db4fb2c20e46beac1a1fc0ef47ec30d9a2bd34
Parents: 4beed50 8014a6e
Author: Stas Levin 
Authored: Mon Feb 13 15:18:07 2017 +0200
Committer: Stas Levin 
Committed: Mon Feb 13 15:18:07 2017 +0200

--
 .../spark/src/test/resources/log4j.properties   | 30 
 1 file changed, 30 insertions(+)
--




[2/2] beam git commit: This closes #1986

2017-02-13 Thread staslevin
This closes #1986


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4beed50f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4beed50f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4beed50f

Branch: refs/heads/master
Commit: 4beed50f27a745ec38c14ea4a4ed88c0a346f485
Parents: 93ae666 2f63b1f
Author: Stas Levin 
Authored: Mon Feb 13 12:45:54 2017 +0200
Committer: Stas Levin 
Committed: Mon Feb 13 12:45:54 2017 +0200

--
 .../streaming/ResumeFromCheckpointStreamingTest.java | 11 +--
 1 file changed, 5 insertions(+), 6 deletions(-)
--




[1/2] beam git commit: Fixed javadoc comment in ResumeFromCheckpointStreamingTest.

2017-02-13 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master 93ae666be -> 4beed50f2


Fixed javadoc comment in ResumeFromCheckpointStreamingTest.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2f63b1f6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2f63b1f6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2f63b1f6

Branch: refs/heads/master
Commit: 2f63b1f63fefaff996c4ae267af8a01a5718e0c0
Parents: 93ae666
Author: Aviem Zur 
Authored: Sun Feb 12 19:56:48 2017 +0200
Committer: Stas Levin 
Committed: Mon Feb 13 12:40:23 2017 +0200

--
 .../streaming/ResumeFromCheckpointStreamingTest.java | 11 +--
 1 file changed, 5 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/2f63b1f6/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 7094c86..5a27b29 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -64,12 +64,7 @@ import org.junit.rules.TemporaryFolder;
 
 
 /**
- * Tests DStream recovery from checkpoint - recreate the job and continue 
(from checkpoint).
- *
- * Tests Aggregators, which rely on Accumulators - Aggregators should be 
available, though
- * state is not preserved (Spark issue), so they start from initial value.
- * //TODO: after the runner supports recovering the state of Aggregators, 
update this test's
- * expected values for the recovered (second) run.
+ * Test pipelines which are resumed from checkpoint.
  */
 public class ResumeFromCheckpointStreamingTest {
   private static final EmbeddedKafkaCluster.EmbeddedZookeeper 
EMBEDDED_ZOOKEEPER =
@@ -118,6 +113,10 @@ public class ResumeFromCheckpointStreamingTest {
 }
   }
 
+  /**
+   * Tests DStream recovery from checkpoint - recreate the job and continue 
(from checkpoint).
+   * Also tests Aggregator values, which should be restored upon recovery 
from checkpoint.
+   */
   @Test
   public void testRun() throws Exception {
 Duration batchIntervalDuration = Duration.standardSeconds(5);



[1/2] beam git commit: [BEAM-882, BEAM-883, BEAM-878] Simplified API surface verifications.

2017-02-09 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master e21f9ae86 -> 6b31c14fa


[BEAM-882,BEAM-883,BEAM-878] Simplified API surface verifications.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cde550fe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cde550fe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cde550fe

Branch: refs/heads/master
Commit: cde550fe9b90be5fc9548735d5854359fca6a9cf
Parents: e21f9ae
Author: Stas Levin 
Authored: Mon Jan 16 16:20:25 2017 +0200
Committer: Stas Levin 
Committed: Thu Feb 9 17:40:36 2017 +0200

--
 .../org/apache/beam/sdk/util/ApiSurface.java| 446 +--
 .../org/apache/beam/SdkCoreApiSurfaceTest.java  |  62 +++
 .../apache/beam/sdk/util/ApiSurfaceTest.java| 152 ++-
 .../apache/beam/sdk/io/gcp/ApiSurfaceTest.java  | 134 --
 .../beam/sdk/io/gcp/GcpApiSurfaceTest.java  |  79 
 5 files changed, 495 insertions(+), 378 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/cde550fe/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
--
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
index 2040161..9530e88 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ApiSurface.java
@@ -17,12 +17,21 @@
  */
 package org.apache.beam.sdk.util;
 
+import static org.hamcrest.Matchers.anyOf;
+
+import com.google.common.base.Function;
 import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
 import com.google.common.base.Supplier;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
 import com.google.common.reflect.ClassPath;
 import com.google.common.reflect.ClassPath.ClassInfo;
@@ -45,15 +54,20 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.regex.Pattern;
+import javax.annotation.Nonnull;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.StringDescription;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Represents the API surface of a package prefix. Used for accessing public 
classes,
- * methods, and the types they reference, to control what dependencies are 
re-exported.
+ * Represents the API surface of a package prefix. Used for accessing public 
classes, methods, and
+ * the types they reference, to control what dependencies are re-exported.
  *
- * For the purposes of calculating the public API surface, exposure 
includes any public
- * or protected occurrence of:
+ * For the purposes of calculating the public API surface, exposure 
includes any public or
+ * protected occurrence of:
  *
  * 
  * superclasses
@@ -66,45 +80,277 @@ import org.slf4j.LoggerFactory;
  * wildcard bounds
  * 
  *
- * Exposure is a transitive property. The resulting map excludes primitives
- * and array classes themselves.
+ * Exposure is a transitive property. The resulting map excludes primitives 
and array classes
+ * themselves.
  *
- * It is prudent (though not required) to prune prefixes like "java" via 
the builder
- * method {@link #pruningPrefix} to halt the traversal so it does not 
uselessly catalog references
- * that are not interesting.
+ * It is prudent (though not required) to prune prefixes like "java" via 
the builder method
+ * {@link #pruningPrefix} to halt the traversal so it does not uselessly 
catalog references that are
+ * not interesting.
  */
 @SuppressWarnings("rawtypes")
 public class ApiSurface {
   private static final Logger LOG = LoggerFactory.getLogger(ApiSurface.class);
 
+  /** A factory method to create a {@link Class} matcher for classes residing 
in a given package. */
+  public static Matcher classesInPackage(final String packageName) {
+return new Matchers.ClassInPackage(packageName);
+  }
+
   /**
-   * Returns an empty {@link ApiSurface}.
+   * A factory method to create an {@link ApiSurface} matcher, producing a 
positive match if the
+   * queried api surface contains ONLY classes described by the provided 
matchers.
*/
-  public static ApiSurface empty() {
-LOG.debug("Returning an empty 

[2/2] beam git commit: This closes #1921

2017-02-05 Thread staslevin
This closes #1921


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c442ef81
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c442ef81
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c442ef81

Branch: refs/heads/master
Commit: c442ef81aa5bfbe84c0e3344ed8dc1d15d6e9a36
Parents: e5afbb2 882c654
Author: Stas Levin 
Authored: Sun Feb 5 15:51:27 2017 +0200
Committer: Stas Levin 
Committed: Sun Feb 5 15:51:27 2017 +0200

--
 .../runners/spark/aggregators/NamedAggregators.java|  6 --
 .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 +
 2 files changed, 17 insertions(+), 2 deletions(-)
--




[1/2] beam git commit: [BEAM-1304] Checking for nullity before trying to obtain an aggregator's value.

2017-02-05 Thread staslevin
Repository: beam
Updated Branches:
  refs/heads/master e5afbb27f -> c442ef81a


[BEAM-1304] Checking for nullity before trying to obtain an aggregator's value.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/882c654b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/882c654b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/882c654b

Branch: refs/heads/master
Commit: 882c654b1a8aefd2e4281d786448734731db7816
Parents: e5afbb2
Author: Stas Levin 
Authored: Sun Feb 5 12:17:35 2017 +0200
Committer: Stas Levin 
Committed: Sun Feb 5 15:51:18 2017 +0200

--
 .../runners/spark/aggregators/NamedAggregators.java|  6 --
 .../aggregators/metrics/sink/NamedAggregatorsTest.java | 13 +
 2 files changed, 17 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
--
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
index b5aec32..c876c07 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/aggregators/NamedAggregators.java
@@ -68,10 +68,12 @@ public class NamedAggregators implements Serializable {
* @param name  Name of aggregator to retrieve.
* @param typeClass Type class to cast the value to.
* @paramType to be returned.
-   * @return the value of the aggregator associated with the specified name
+   * @return the value of the aggregator associated with the specified name,
+   * or null if the specified aggregator could not be found.
*/
   public  T getValue(String name, Class typeClass) {
-return typeClass.cast(mNamedAggregators.get(name).render());
+final State state = mNamedAggregators.get(name);
+return state != null ? typeClass.cast(state.render()) : null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/882c654b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
--
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index 3b5dd21..8646510 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -28,10 +28,13 @@ import java.util.List;
 import java.util.Set;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.ClearAggregatorsRule;
+import org.apache.beam.runners.spark.aggregators.SparkAggregators;
 import org.apache.beam.runners.spark.examples.WordCount;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import 
org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.MapElements;
@@ -95,4 +98,14 @@ public class NamedAggregatorsTest {
 assertThat(InMemoryMetrics.valueOf("emptyLines"), is(1d));
 
   }
+
+  @Test
+  public void testNonExistingAggregatorName() throws Exception {
+final SparkPipelineOptions options = 
PipelineOptionsFactory.as(SparkPipelineOptions.class);
+final Long valueOf =
+SparkAggregators.valueOf(
+"myMissingAggregator", Long.class, 
SparkContextFactory.getSparkContext(options));
+
+assertThat(valueOf, is(nullValue()));
+  }
 }