Repository: beam
Updated Branches:
  refs/heads/master 1c6861f22 -> 97957ea0e


Fix compile error occurs in some JDKs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7ae7ecf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7ae7ecf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7ae7ecf

Branch: refs/heads/master
Commit: b7ae7ecffcd08b6a0ccc8296210d36b90306c171
Parents: 1c6861f
Author: Mark Liu <mark...@google.com>
Authored: Wed Jun 7 16:27:34 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 9 10:54:12 2017 -0700

----------------------------------------------------------------------
 .../runners/flink/FlinkStreamingTransformTranslators.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b7ae7ecf/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index ef46b63..fef32de 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -363,8 +363,13 @@ class FlinkStreamingTransformTranslators {
       Map<TupleTag<?>, OutputTag<WindowedValue<?>>> tagsToOutputTags = 
Maps.newHashMap();
       for (Map.Entry<TupleTag<?>, PValue> entry : outputs.entrySet()) {
         if (!tagsToOutputTags.containsKey(entry.getKey())) {
-          tagsToOutputTags.put(entry.getKey(), new 
OutputTag<>(entry.getKey().getId(),
-              (TypeInformation) context.getTypeInfo((PCollection<?>) 
entry.getValue())));
+          tagsToOutputTags.put(
+              entry.getKey(),
+              new OutputTag<WindowedValue<?>>(
+                  entry.getKey().getId(),
+                  (TypeInformation) context.getTypeInfo((PCollection<?>) 
entry.getValue())
+              )
+          );
         }
       }
 

Reply via email to