Repository: beam Updated Branches: refs/heads/master 1c6861f22 -> 97957ea0e
Fix compile error occurs in some JDKs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7ae7ecf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7ae7ecf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7ae7ecf Branch: refs/heads/master Commit: b7ae7ecffcd08b6a0ccc8296210d36b90306c171 Parents: 1c6861f Author: Mark Liu <mark...@google.com> Authored: Wed Jun 7 16:27:34 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Jun 9 10:54:12 2017 -0700 ---------------------------------------------------------------------- .../runners/flink/FlinkStreamingTransformTranslators.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b7ae7ecf/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index ef46b63..fef32de 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -363,8 +363,13 @@ class FlinkStreamingTransformTranslators { Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = Maps.newHashMap(); for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) { if (!tagsToOutputTags.containsKey(entry.getKey())) { - tagsToOutputTags.put(entry.getKey(), new OutputTag<>(entry.getKey().getId(), - (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue()))); + tagsToOutputTags.put( + entry.getKey(), + new OutputTag<WindowedValue<?>>( + entry.getKey().getId(), + (TypeInformation) context.getTypeInfo((PCollection<?>) entry.getValue()) + ) + ); } }