[FLINK-9152] Harmonize BroadcastProcessFunction Context names
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/584229dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/584229dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/584229dc Branch: refs/heads/master Commit: 584229dc00e62a6a8540b059645269bb04d0ba04 Parents: d5d6842 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Apr 9 16:08:58 2018 -0700 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Apr 12 08:04:30 2018 -0700 ---------------------------------------------------------------------- .../co/KeyedBroadcastProcessFunction.java | 16 ++++++------- .../co/CoBroadcastWithKeyedOperator.java | 6 +++-- .../flink/streaming/api/DataStreamTest.java | 4 ++-- .../co/CoBroadcastWithKeyedOperatorTest.java | 24 ++++++++++---------- .../api/scala/BroadcastStateITCase.scala | 4 ++-- .../streaming/runtime/BroadcastStateITCase.java | 4 ++-- 6 files changed, 30 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java index 6e6ae5c..9263be0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java @@ -39,9 +39,9 @@ import org.apache.flink.util.Collector; * * <p>The user has to implement two methods: * <ol> - * <li>the {@link #processBroadcastElement(Object, KeyedContext, Collector)} which will be applied to + * <li>the {@link #processBroadcastElement(Object, Context, Collector)} which will be applied to * each element in the broadcast side - * <li> and the {@link #processElement(Object, KeyedReadOnlyContext, Collector)} which will be applied to the + * <li> and the {@link #processElement(Object, ReadOnlyContext, Collector)} which will be applied to the * non-broadcasted/keyed side. * </ol> * @@ -71,7 +71,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B * The context is only valid during the invocation of this method, do not store it. * * @param value The stream element. - * @param ctx A {@link KeyedReadOnlyContext} that allows querying the timestamp of the element, + * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element, * querying the current processing/event time and iterating the broadcast state * with <b>read-only</b> access. * The context is only valid during the invocation of this method, do not store it. @@ -79,7 +79,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ - public abstract void processElement(final IN1 value, final KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception; + public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception; /** * This method is called for each element in the @@ -102,7 +102,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B * @throws Exception The function may throw exceptions which cause the streaming program * to fail and go into recovery. */ - public abstract void processBroadcastElement(final IN2 value, final KeyedContext ctx, final Collector<OUT> out) throws Exception; + public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. @@ -130,7 +130,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B * this also allows to apply a {@link KeyedStateFunction} to the (local) states of all active keys * in the your backend. */ - public abstract class KeyedContext extends Context { + public abstract class Context extends BaseBroadcastProcessFunction.Context { /** * Applies the provided {@code function} to the state @@ -152,7 +152,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B * this also allows to get a <b>read-only</b> {@link Iterable} over the elements stored in the * broadcast state and a {@link TimerService} for querying time and registering timers. */ - public abstract class KeyedReadOnlyContext extends ReadOnlyContext { + public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext { /** * A {@link TimerService} for querying time and registering timers. @@ -163,7 +163,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B /** * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}. */ - public abstract class OnTimerContext extends KeyedReadOnlyContext { + public abstract class OnTimerContext extends ReadOnlyContext { /** * The {@link TimeDomain} of the firing timer, i.e. if it is http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java index 871363b..5f7bbe2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.SimpleTimerService; import org.apache.flink.streaming.api.TimeDomain; import org.apache.flink.streaming.api.TimerService; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; +import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.ReadOnlyContext; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -141,7 +142,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> onTimerContext.timer = null; } - private class ReadWriteContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext { + private class ReadWriteContextImpl + extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context { private final ExecutionConfig config; @@ -220,7 +222,7 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> } } - private class ReadOnlyContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext { + private class ReadOnlyContextImpl extends ReadOnlyContext { private final ExecutionConfig config; http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 6326672..4d2d6e1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -879,12 +879,12 @@ public class DataStreamTest extends TestLogger { bcStream.process( new KeyedBroadcastProcessFunction<String, Long, String, String>() { @Override - public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { // do nothing } @Override - public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception { // do nothing } }); http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java index b923b75..c3692d5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java @@ -139,7 +139,7 @@ public class CoBroadcastWithKeyedOperatorTest { } @Override - public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception { // put an element in the broadcast state ctx.applyToKeyedState( listStateDesc, @@ -158,7 +158,7 @@ public class CoBroadcastWithKeyedOperatorTest { } @Override - public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { getRuntimeContext().getListState(listStateDesc).add(value); } } @@ -216,12 +216,12 @@ public class CoBroadcastWithKeyedOperatorTest { } @Override - public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception { out.collect("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()); } @Override - public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { ctx.timerService().registerEventTimeTimer(timerTS); out.collect("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()); } @@ -289,12 +289,12 @@ public class CoBroadcastWithKeyedOperatorTest { }; @Override - public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception { ctx.output(BROADCAST_TAG, "BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()); } @Override - public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { ctx.output(NON_BROADCAST_TAG, "NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp()); } } @@ -380,14 +380,14 @@ public class CoBroadcastWithKeyedOperatorTest { } @Override - public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception { // put an element in the broadcast state final String key = value + "." + keyPostfix; ctx.getBroadcastState(STATE_DESCRIPTOR).put(key, value); } @Override - public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { Iterable<Map.Entry<String, Integer>> broadcastStateIt = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries(); Iterator<Map.Entry<String, Integer>> iter = broadcastStateIt.iterator(); @@ -621,7 +621,7 @@ public class CoBroadcastWithKeyedOperatorTest { } @Override - public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception { // put an element in the broadcast state for (String k : keysToRegister) { ctx.getBroadcastState(STATE_DESCRIPTOR).put(k, value); @@ -629,7 +629,7 @@ public class CoBroadcastWithKeyedOperatorTest { } @Override - public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { for (Map.Entry<String, Integer> entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) { out.collect(entry.toString()); } @@ -652,12 +652,12 @@ public class CoBroadcastWithKeyedOperatorTest { private final ValueStateDescriptor<String> valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO); @Override - public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception { getRuntimeContext().getState(valueState).value(); // this should fail } @Override - public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { // do nothing } }) http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala index 55bb3ba..f1bfced 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala @@ -109,7 +109,7 @@ class TestBroadcastProcessFunction( @throws[Exception] override def processElement( value: Long, - ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedReadOnlyContext, + ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#ReadOnlyContext, out: Collector[String]): Unit = { val currentTime = nextTimerTimestamp @@ -121,7 +121,7 @@ class TestBroadcastProcessFunction( @throws[Exception] override def processBroadcastElement( value: String, - ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedContext, + ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#Context, out: Collector[String]): Unit = { val key = value.split(":")(1).toLong http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java index 7ccba33..8f442c0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java @@ -167,7 +167,7 @@ public class BroadcastStateITCase { } @Override - public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { + public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception { long currentTime = nextTimerTimestamp; nextTimerTimestamp++; ctx.timerService().registerEventTimeTimer(currentTime); @@ -175,7 +175,7 @@ public class BroadcastStateITCase { } @Override - public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception { + public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception { long key = Long.parseLong(value.split(":")[1]); ctx.getBroadcastState(descriptor).put(key, value); }