[FLINK-9152] Harmonize BroadcastProcessFunction Context names

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/584229dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/584229dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/584229dc

Branch: refs/heads/master
Commit: 584229dc00e62a6a8540b059645269bb04d0ba04
Parents: d5d6842
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Apr 9 16:08:58 2018 -0700
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Apr 12 08:04:30 2018 -0700

----------------------------------------------------------------------
 .../co/KeyedBroadcastProcessFunction.java       | 16 ++++++-------
 .../co/CoBroadcastWithKeyedOperator.java        |  6 +++--
 .../flink/streaming/api/DataStreamTest.java     |  4 ++--
 .../co/CoBroadcastWithKeyedOperatorTest.java    | 24 ++++++++++----------
 .../api/scala/BroadcastStateITCase.scala        |  4 ++--
 .../streaming/runtime/BroadcastStateITCase.java |  4 ++--
 6 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
index 6e6ae5c..9263be0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
@@ -39,9 +39,9 @@ import org.apache.flink.util.Collector;
  *
  * <p>The user has to implement two methods:
  * <ol>
- *     <li>the {@link #processBroadcastElement(Object, KeyedContext, 
Collector)} which will be applied to
+ *     <li>the {@link #processBroadcastElement(Object, Context, Collector)} 
which will be applied to
  *     each element in the broadcast side
- *     <li> and the {@link #processElement(Object, KeyedReadOnlyContext, 
Collector)} which will be applied to the
+ *     <li> and the {@link #processElement(Object, ReadOnlyContext, 
Collector)} which will be applied to the
  *     non-broadcasted/keyed side.
  * </ol>
  *
@@ -71,7 +71,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, 
IN2, OUT> extends B
         * The context is only valid during the invocation of this method, do 
not store it.
         *
         * @param value The stream element.
-        * @param ctx A {@link KeyedReadOnlyContext} that allows querying the 
timestamp of the element,
+        * @param ctx A {@link ReadOnlyContext} that allows querying the 
timestamp of the element,
         *            querying the current processing/event time and iterating 
the broadcast state
         *            with <b>read-only</b> access.
         *            The context is only valid during the invocation of this 
method, do not store it.
@@ -79,7 +79,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, 
IN2, OUT> extends B
         * @throws Exception The function may throw exceptions which cause the 
streaming program
         *                   to fail and go into recovery.
         */
-       public abstract void processElement(final IN1 value, final 
KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
+       public abstract void processElement(final IN1 value, final 
ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
 
        /**
         * This method is called for each element in the
@@ -102,7 +102,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, 
IN1, IN2, OUT> extends B
         * @throws Exception The function may throw exceptions which cause the 
streaming program
         *                   to fail and go into recovery.
         */
-       public abstract void processBroadcastElement(final IN2 value, final 
KeyedContext ctx, final Collector<OUT> out) throws Exception;
+       public abstract void processBroadcastElement(final IN2 value, final 
Context ctx, final Collector<OUT> out) throws Exception;
 
        /**
         * Called when a timer set using {@link TimerService} fires.
@@ -130,7 +130,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, 
IN1, IN2, OUT> extends B
         * this also allows to apply a {@link KeyedStateFunction} to the 
(local) states of all active keys
         * in the your backend.
         */
-       public abstract class KeyedContext extends Context {
+       public abstract class Context extends 
BaseBroadcastProcessFunction.Context {
 
                /**
                 * Applies the provided {@code function} to the state
@@ -152,7 +152,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, 
IN1, IN2, OUT> extends B
         * this also allows to get a <b>read-only</b> {@link Iterable} over the 
elements stored in the
         * broadcast state and a {@link TimerService} for querying time and 
registering timers.
         */
-       public abstract class KeyedReadOnlyContext extends ReadOnlyContext {
+       public abstract class ReadOnlyContext extends 
BaseBroadcastProcessFunction.ReadOnlyContext {
 
                /**
                 * A {@link TimerService} for querying time and registering 
timers.
@@ -163,7 +163,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, 
IN1, IN2, OUT> extends B
        /**
         * Information available in an invocation of {@link #onTimer(long, 
OnTimerContext, Collector)}.
         */
-       public abstract class OnTimerContext extends KeyedReadOnlyContext {
+       public abstract class OnTimerContext extends ReadOnlyContext {
 
                /**
                 * The {@link TimeDomain} of the firing timer, i.e. if it is

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
index 871363b..5f7bbe2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.SimpleTimerService;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.ReadOnlyContext;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -141,7 +142,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
                onTimerContext.timer = null;
        }
 
-       private class ReadWriteContextImpl extends 
KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext {
+       private class ReadWriteContextImpl
+                       extends KeyedBroadcastProcessFunction<KS, IN1, IN2, 
OUT>.Context {
 
                private final ExecutionConfig config;
 
@@ -220,7 +222,7 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
                }
        }
 
-       private class ReadOnlyContextImpl extends 
KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext {
+       private class ReadOnlyContextImpl extends ReadOnlyContext {
 
                private final ExecutionConfig config;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 6326672..4d2d6e1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -879,12 +879,12 @@ public class DataStreamTest extends TestLogger {
                bcStream.process(
                                new KeyedBroadcastProcessFunction<String, Long, 
String, String>() {
                                        @Override
-                                       public void 
processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) 
throws Exception {
+                                       public void 
processBroadcastElement(String value, Context ctx, Collector<String> out) 
throws Exception {
                                                // do nothing
                                        }
 
                                        @Override
-                                       public void processElement(Long value, 
KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+                                       public void processElement(Long value, 
ReadOnlyContext ctx, Collector<String> out) throws Exception {
                                                // do nothing
                                        }
                                });

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index b923b75..c3692d5 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -139,7 +139,7 @@ public class CoBroadcastWithKeyedOperatorTest {
                }
 
                @Override
-               public void processBroadcastElement(Integer value, KeyedContext 
ctx, Collector<String> out) throws Exception {
+               public void processBroadcastElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        // put an element in the broadcast state
                        ctx.applyToKeyedState(
                                        listStateDesc,
@@ -158,7 +158,7 @@ public class CoBroadcastWithKeyedOperatorTest {
                }
 
                @Override
-               public void processElement(String value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+               public void processElement(String value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
                        
getRuntimeContext().getListState(listStateDesc).add(value);
                }
        }
@@ -216,12 +216,12 @@ public class CoBroadcastWithKeyedOperatorTest {
                }
 
                @Override
-               public void processBroadcastElement(Integer value, KeyedContext 
ctx, Collector<String> out) throws Exception {
+               public void processBroadcastElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        out.collect("BR:" + value + " WM:" + 
ctx.currentWatermark() + " TS:" + ctx.timestamp());
                }
 
                @Override
-               public void processElement(String value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+               public void processElement(String value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
                        ctx.timerService().registerEventTimeTimer(timerTS);
                        out.collect("NON-BR:" + value + " WM:" + 
ctx.currentWatermark() + " TS:" + ctx.timestamp());
                }
@@ -289,12 +289,12 @@ public class CoBroadcastWithKeyedOperatorTest {
                };
 
                @Override
-               public void processBroadcastElement(Integer value, KeyedContext 
ctx, Collector<String> out) throws Exception {
+               public void processBroadcastElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        ctx.output(BROADCAST_TAG, "BR:" + value + " WM:" + 
ctx.currentWatermark() + " TS:" + ctx.timestamp());
                }
 
                @Override
-               public void processElement(String value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+               public void processElement(String value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
                        ctx.output(NON_BROADCAST_TAG, "NON-BR:" + value + " 
WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
                }
        }
@@ -380,14 +380,14 @@ public class CoBroadcastWithKeyedOperatorTest {
                }
 
                @Override
-               public void processBroadcastElement(Integer value, KeyedContext 
ctx, Collector<String> out) throws Exception {
+               public void processBroadcastElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        // put an element in the broadcast state
                        final String key = value + "." + keyPostfix;
                        ctx.getBroadcastState(STATE_DESCRIPTOR).put(key, value);
                }
 
                @Override
-               public void processElement(String value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+               public void processElement(String value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
                        Iterable<Map.Entry<String, Integer>> broadcastStateIt = 
ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries();
                        Iterator<Map.Entry<String, Integer>> iter = 
broadcastStateIt.iterator();
 
@@ -621,7 +621,7 @@ public class CoBroadcastWithKeyedOperatorTest {
                }
 
                @Override
-               public void processBroadcastElement(Integer value, KeyedContext 
ctx, Collector<String> out) throws Exception {
+               public void processBroadcastElement(Integer value, Context ctx, 
Collector<String> out) throws Exception {
                        // put an element in the broadcast state
                        for (String k : keysToRegister) {
                                ctx.getBroadcastState(STATE_DESCRIPTOR).put(k, 
value);
@@ -629,7 +629,7 @@ public class CoBroadcastWithKeyedOperatorTest {
                }
 
                @Override
-               public void processElement(String value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+               public void processElement(String value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
                        for (Map.Entry<String, Integer> entry : 
ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
                                out.collect(entry.toString());
                        }
@@ -652,12 +652,12 @@ public class CoBroadcastWithKeyedOperatorTest {
                                                        private final 
ValueStateDescriptor<String> valueState = new ValueStateDescriptor<>("any", 
BasicTypeInfo.STRING_TYPE_INFO);
 
                                                        @Override
-                                                       public void 
processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) 
throws Exception {
+                                                       public void 
processBroadcastElement(Integer value, Context ctx, Collector<String> out) 
throws Exception {
                                                                
getRuntimeContext().getState(valueState).value(); // this should fail
                                                        }
 
                                                        @Override
-                                                       public void 
processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) 
throws Exception {
+                                                       public void 
processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws 
Exception {
                                                                // do nothing
                                                        }
                                                })

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
index 55bb3ba..f1bfced 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
@@ -109,7 +109,7 @@ class TestBroadcastProcessFunction(
   @throws[Exception]
   override def processElement(
       value: Long,
-      ctx: KeyedBroadcastProcessFunction[Long, Long, String, 
String]#KeyedReadOnlyContext,
+      ctx: KeyedBroadcastProcessFunction[Long, Long, String, 
String]#ReadOnlyContext,
       out: Collector[String]): Unit = {
 
     val currentTime = nextTimerTimestamp
@@ -121,7 +121,7 @@ class TestBroadcastProcessFunction(
   @throws[Exception]
   override def processBroadcastElement(
       value: String,
-      ctx: KeyedBroadcastProcessFunction[Long, Long, String, 
String]#KeyedContext,
+      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#Context,
       out: Collector[String]): Unit = {
 
     val key = value.split(":")(1).toLong

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 7ccba33..8f442c0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -167,7 +167,7 @@ public class BroadcastStateITCase {
                }
 
                @Override
-               public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
+               public void processElement(Long value, ReadOnlyContext ctx, 
Collector<String> out) throws Exception {
                        long currentTime = nextTimerTimestamp;
                        nextTimerTimestamp++;
                        ctx.timerService().registerEventTimeTimer(currentTime);
@@ -175,7 +175,7 @@ public class BroadcastStateITCase {
                }
 
                @Override
-               public void processBroadcastElement(String value, KeyedContext 
ctx, Collector<String> out) throws Exception {
+               public void processBroadcastElement(String value, Context ctx, 
Collector<String> out) throws Exception {
                        long key = Long.parseLong(value.split(":")[1]);
                        ctx.getBroadcastState(descriptor).put(key, value);
                }

Reply via email to