[FLINK-9152] Use in-class Context objects in BroadcastProcessFunction

This brings it in line with KeyedBroadcastProcessFunction, which uses
context objects defined in KeyedBroadcastProcessFunction. The context
objects here have no added functionality but we still define them here
so that the methods don't refer to the base class implementations for
consistency.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24a8f518
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24a8f518
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24a8f518

Branch: refs/heads/release-1.5
Commit: 24a8f5186cec112feb06ca1d38282e5a3cd9f085
Parents: 51beea6
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Apr 9 16:12:43 2018 -0700
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Apr 12 08:08:32 2018 -0700

----------------------------------------------------------------------
 .../api/functions/co/BroadcastProcessFunction.java      | 12 ++++++++++++
 .../operators/co/CoBroadcastWithNonKeyedOperator.java   |  4 ++--
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24a8f518/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
index 257ea83..9e5540e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
@@ -90,4 +90,16 @@ public abstract class BroadcastProcessFunction<IN1, IN2, 
OUT> extends BaseBroadc
         *                   to fail and go into recovery.
         */
        public abstract void processBroadcastElement(IN2 value, Context ctx, 
Collector<OUT> out) throws Exception;
+
+       /**
+        * A {@link BaseBroadcastProcessFunction.Context context} available to 
the broadcast side of
+        * a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}.
+        */
+       public abstract class Context extends 
BaseBroadcastProcessFunction.Context {}
+
+       /**
+        * A {@link BaseBroadcastProcessFunction.Context context} available to 
the non-keyed side of
+        * a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
+        */
+       public abstract class ReadOnlyContext extends 
BaseBroadcastProcessFunction.ReadOnlyContext {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24a8f518/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
index 7e1e431..5bed3bb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
-import 
org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction.Context;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -113,7 +113,7 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
                currentWatermark = mark.getTimestamp();
        }
 
-       private class ReadWriteContextImpl extends 
BaseBroadcastProcessFunction.Context {
+       private class ReadWriteContextImpl extends Context {
 
                private final ExecutionConfig config;
 

Reply via email to