[BEAM-333][flink] make bounded/unbounded sources stoppable
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e2820b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e2820b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e2820b0 Branch: refs/heads/master Commit: 7e2820b06c19d958cbf7316ae28def7fe796a360 Parents: be689df Author: Maximilian Michels <m...@apache.org> Authored: Tue Sep 6 16:38:43 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Fri Sep 9 16:06:42 2016 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/io/BoundedSourceWrapper.java | 9 ++++++++- .../wrappers/streaming/io/UnboundedSourceWrapper.java | 8 +++++++- 2 files changed, 15 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 3cb93c0..df49a49 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; @@ -37,7 +38,8 @@ import org.slf4j.LoggerFactory; * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source. */ public class BoundedSourceWrapper<OutputT> - extends RichParallelSourceFunction<WindowedValue<OutputT>> { + extends RichParallelSourceFunction<WindowedValue<OutputT>> + implements StoppableFunction { private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class); @@ -206,6 +208,11 @@ public class BoundedSourceWrapper<OutputT> isRunning = false; } + @Override + public void stop() { + this.isRunning = false; + } + /** * Visible so that we can check this in tests. Must not be used for anything else. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e2820b0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index 8647322..debf52f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.api.common.functions.StoppableFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; @@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory; public class UnboundedSourceWrapper< OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> extends RichParallelSourceFunction<WindowedValue<OutputT>> - implements Triggerable, Checkpointed<byte[]> { + implements Triggerable, StoppableFunction, Checkpointed<byte[]> { private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); @@ -311,6 +312,11 @@ public class UnboundedSourceWrapper< } @Override + public void stop() { + isRunning = false; + } + + @Override public byte[] snapshotState(long l, long l1) throws Exception { if (checkpointCoder == null) {