[hotfix] Clean broadcast functions when translating. This closes #5477.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/010f44c7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/010f44c7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/010f44c7 Branch: refs/heads/master Commit: 010f44c714907d6d0cb957ac9bea036cbc34c436 Parents: cd6fe1c Author: kkloudas <kklou...@gmail.com> Authored: Tue Feb 13 15:13:50 2018 +0100 Committer: kkloudas <kklou...@gmail.com> Committed: Tue Feb 13 17:34:05 2018 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/datastream/BroadcastConnectedStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/010f44c7/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java index f3c4838..e5454ef 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java @@ -161,7 +161,7 @@ public class BroadcastConnectedStream<IN1, IN2> { "A KeyedBroadcastProcessFunction can only be used with a keyed stream as the second input."); TwoInputStreamOperator<IN1, IN2, OUT> operator = - new CoBroadcastWithKeyedOperator<>(function, broadcastStateDescriptors); + new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors); return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator); } @@ -212,7 +212,7 @@ public class BroadcastConnectedStream<IN1, IN2> { "A BroadcastProcessFunction can only be used with a non-keyed stream as the second input."); TwoInputStreamOperator<IN1, IN2, OUT> operator = - new CoBroadcastWithNonKeyedOperator<>(function, broadcastStateDescriptors); + new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors); return transform("Co-Process-Broadcast", outTypeInfo, operator); }