Add RawUnion code to FlinkDoFnFunction

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5780fc5e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5780fc5e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5780fc5e

Branch: refs/heads/master
Commit: 5780fc5e8cd64911d9612d89896b9d68be4f621f
Parents: a0444b8
Author: Dan Halperin <dhalp...@google.com>
Authored: Wed May 31 10:50:46 2017 -0700
Committer: Dan Halperin <dhalp...@google.com>
Committed: Wed May 31 11:36:16 2017 -0700

----------------------------------------------------------------------
 .../runners/flink/translation/functions/FlinkDoFnFunction.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5780fc5e/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index ab2ac6b..d8ed622 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -146,7 +146,9 @@ public class FlinkDoFnFunction<InputT, OutputT>
     @Override
     @SuppressWarnings("unchecked")
     public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-      collector.collect(output);
+      collector.collect(
+          WindowedValue.of(new RawUnionValue(0 /* single output */, 
output.getValue()),
+          output.getTimestamp(), output.getWindows(), output.getPane()));
     }
   }
 

Reply via email to