Keep throws in StreamingEvaluationContext cancel() and waitUntilFinish
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b37526ea Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b37526ea Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b37526ea Branch: refs/heads/master Commit: b37526ea53f82f294eaed8e258c357a46ff01cd6 Parents: c92f986 Author: Pei He <pe...@google.com> Authored: Mon Oct 17 11:41:41 2016 -0700 Committer: Pei He <pe...@google.com> Committed: Tue Oct 18 10:57:49 2016 -0700 ---------------------------------------------------------------------- .../spark/translation/EvaluationContext.java | 2 +- .../streaming/StreamingEvaluationContext.java | 22 ++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b37526ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 83ef3c5..c1c65dd 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -309,7 +309,7 @@ public class EvaluationContext implements EvaluationResult { @Override public State waitUntilFinish(Duration duration) throws IOException, InterruptedException { - // This is no-op, since Spark runner is blocking. + // This is no-op, since Spark runner in batch is blocking. // It needs to be updated once SparkRunner supports non-blocking execution: // https://issues.apache.org/jira/browse/BEAM-595 return State.DONE; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b37526ea/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java index 0b32dfd..2652f2b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingEvaluationContext.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming; import com.google.common.collect.Iterables; +import java.io.IOException; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; @@ -45,6 +46,7 @@ import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.joda.time.Duration; /** @@ -198,6 +200,26 @@ public class StreamingEvaluationContext extends EvaluationContext { return state; } + @Override + public State cancel() throws IOException { + throw new UnsupportedOperationException( + "Spark runner StreamingEvaluationContext does not support cancel."); + } + + @Override + public State waitUntilFinish() + throws IOException, InterruptedException { + throw new UnsupportedOperationException( + "Spark runner StreamingEvaluationContext does not support waitUntilFinish."); + } + + @Override + public State waitUntilFinish(Duration duration) + throws IOException, InterruptedException { + throw new UnsupportedOperationException( + "Spark runner StreamingEvaluationContext does not support waitUntilFinish."); + } + //---------------- override in order to expose in package @Override protected <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {