[hotfix] Clean broadcast functions when translating.

This closes #5477.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/010f44c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/010f44c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/010f44c7

Branch: refs/heads/master
Commit: 010f44c714907d6d0cb957ac9bea036cbc34c436
Parents: cd6fe1c
Author: kkloudas <kklou...@gmail.com>
Authored: Tue Feb 13 15:13:50 2018 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Tue Feb 13 17:34:05 2018 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/BroadcastConnectedStream.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/010f44c7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
index f3c4838..e5454ef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
@@ -161,7 +161,7 @@ public class BroadcastConnectedStream<IN1, IN2> {
                                "A KeyedBroadcastProcessFunction can only be 
used with a keyed stream as the second input.");
 
                TwoInputStreamOperator<IN1, IN2, OUT> operator =
-                               new CoBroadcastWithKeyedOperator<>(function, 
broadcastStateDescriptors);
+                               new 
CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
                return transform("Co-Process-Broadcast-Keyed", outTypeInfo, 
operator);
        }
 
@@ -212,7 +212,7 @@ public class BroadcastConnectedStream<IN1, IN2> {
                                "A BroadcastProcessFunction can only be used 
with a non-keyed stream as the second input.");
 
                TwoInputStreamOperator<IN1, IN2, OUT> operator =
-                               new CoBroadcastWithNonKeyedOperator<>(function, 
broadcastStateDescriptors);
+                               new 
CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
                return transform("Co-Process-Broadcast", outTypeInfo, operator);
        }
 

Reply via email to