[BEAM-1726] Fix Flatten with input copies in Flink Streaming Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e2bb180 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e2bb180 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e2bb180 Branch: refs/heads/master Commit: 0e2bb1808350cbebf771d0971deb06787732800d Parents: 7c44935 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Sun Mar 19 07:49:08 2017 +0100 Committer: Aviem Zur <aviem...@gmail.com> Committed: Thu May 4 20:48:56 2017 +0300 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 26 ++++++++++++++++++++ 1 file changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0e2bb180/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index c024493..7339c01 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -966,10 +966,36 @@ class FlinkStreamingTransformTranslators { } else { DataStream<T> result = null; + + // Determine DataStreams that we use as input several times. For those, we need to uniquify + // input streams because Flink seems to swallow watermarks when we have a union of one and + // the same stream. + Map<DataStream<T>, Integer> duplicates = new HashMap<>(); + for (PValue input : allInputs.values()) { + DataStream<T> current = context.getInputDataStream(input); + Integer oldValue = duplicates.put(current, 1); + if (oldValue != null) { + duplicates.put(current, oldValue + 1); + } + } + for (PValue input : allInputs.values()) { DataStream<T> current = context.getInputDataStream(input); + + final Integer timesRequired = duplicates.get(current); + if (timesRequired > 1) { + current = current.flatMap(new FlatMapFunction<T, T>() { + private static final long serialVersionUID = 1L; + + @Override + public void flatMap(T t, Collector<T> collector) throws Exception { + collector.collect(t); + } + }); + } result = (result == null) ? current : result.union(current); } + context.setOutputDataStream(context.getOutput(transform), result); } }