Repository: flink Updated Branches: refs/heads/master 528cca5cf -> 010f44c71
[FLINK-8642] Initialize descriptors before use at getBroadcastState(). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd6fe1c8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd6fe1c8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd6fe1c8 Branch: refs/heads/master Commit: cd6fe1c84fa97208a3cfd0c02be41c9799ba973a Parents: 528cca5 Author: kkloudas <kklou...@gmail.com> Authored: Tue Feb 13 11:46:02 2018 +0100 Committer: kkloudas <kklou...@gmail.com> Committed: Tue Feb 13 17:32:33 2018 +0100 ---------------------------------------------------------------------- .../examples/broadcast/BroadcastExample.scala | 5 ++-- .../co/CoBroadcastWithKeyedOperator.java | 25 +++++++++++++++++--- .../co/CoBroadcastWithNonKeyedOperator.java | 17 +++++++++++-- .../flink/streaming/api/scala/DataStream.scala | 4 ++-- .../api/scala/BroadcastStateITCase.scala | 4 ++-- .../streaming/runtime/BroadcastStateITCase.java | 24 ++++++++++++++----- 6 files changed, 62 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala ---------------------------------------------------------------------- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala index fff3c39..d158738 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala @@ -68,9 +68,10 @@ object BroadcastExample { .connect(broadcastStream) .process(new KeyedBroadcastProcessFunction[Int, (Int, Int), Int, String]() { - val valueState = new ValueStateDescriptor[String]("any", BasicTypeInfo.STRING_TYPE_INFO) + private lazy val valueState = new ValueStateDescriptor[String]( + "any", BasicTypeInfo.STRING_TYPE_INFO) - val mapStateDesc = new MapStateDescriptor[String, Integer]( + private lazy val mapStateDesc = new MapStateDescriptor[String, Integer]( "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO) @throws[Exception] http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java index 4872c61..2bdb683 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators.co; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; @@ -99,9 +100,9 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor)); } - rwContext = new ReadWriteContextImpl(getKeyedStateBackend(), userFunction, broadcastStates, timerService); - rContext = new ReadOnlyContextImpl(userFunction, broadcastStates, timerService); - onTimerContext = new OnTimerContextImpl(userFunction, broadcastStates, timerService); + rwContext = new ReadWriteContextImpl(getExecutionConfig(), getKeyedStateBackend(), userFunction, broadcastStates, timerService); + rContext = new ReadOnlyContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService); + onTimerContext = new OnTimerContextImpl(getExecutionConfig(), userFunction, broadcastStates, timerService); } @Override @@ -142,6 +143,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> private class ReadWriteContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext { + private final ExecutionConfig config; + private final KeyedStateBackend<KS> keyedStateBackend; private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; @@ -151,12 +154,14 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> private StreamRecord<IN2> element; ReadWriteContextImpl ( + final ExecutionConfig executionConfig, final KeyedStateBackend<KS> keyedStateBackend, final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function, final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, final TimerService timerService) { function.super(); + this.config = Preconditions.checkNotNull(executionConfig); this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend); this.states = Preconditions.checkNotNull(broadcastStates); this.timerService = Preconditions.checkNotNull(timerService); @@ -175,6 +180,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> @Override public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { Preconditions.checkNotNull(stateDescriptor); + + stateDescriptor.initializeSerializerUnlessSet(config); BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor); if (state == null) { throw new IllegalArgumentException("The requested state does not exist. " + @@ -215,6 +222,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> private class ReadOnlyContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext { + private final ExecutionConfig config; + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; private final TimerService timerService; @@ -222,11 +231,13 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> private StreamRecord<IN1> element; ReadOnlyContextImpl( + final ExecutionConfig executionConfig, final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function, final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, final TimerService timerService) { function.super(); + this.config = Preconditions.checkNotNull(executionConfig); this.states = Preconditions.checkNotNull(broadcastStates); this.timerService = Preconditions.checkNotNull(timerService); } @@ -265,6 +276,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> @Override public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { Preconditions.checkNotNull(stateDescriptor); + + stateDescriptor.initializeSerializerUnlessSet(config); ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor); if (state == null) { throw new IllegalArgumentException("The requested state does not exist. " + @@ -277,6 +290,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> private class OnTimerContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext { + private final ExecutionConfig config; + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; private final TimerService timerService; @@ -286,11 +301,13 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> private InternalTimer<KS, VoidNamespace> timer; OnTimerContextImpl( + final ExecutionConfig executionConfig, final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function, final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, final TimerService timerService) { function.super(); + this.config = Preconditions.checkNotNull(executionConfig); this.states = Preconditions.checkNotNull(broadcastStates); this.timerService = Preconditions.checkNotNull(timerService); } @@ -331,6 +348,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT> @Override public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { Preconditions.checkNotNull(stateDescriptor); + + stateDescriptor.initializeSerializerUnlessSet(config); ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor); if (state == null) { throw new IllegalArgumentException("The requested state does not exist. " + http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java index 25bf873..7e1e431 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators.co; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; @@ -86,8 +87,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> broadcastStates.put(descriptor, getOperatorStateBackend().getBroadcastState(descriptor)); } - rwContext = new ReadWriteContextImpl(userFunction, broadcastStates, getProcessingTimeService()); - rContext = new ReadOnlyContextImpl(userFunction, broadcastStates, getProcessingTimeService()); + rwContext = new ReadWriteContextImpl(getExecutionConfig(), userFunction, broadcastStates, getProcessingTimeService()); + rContext = new ReadOnlyContextImpl(getExecutionConfig(), userFunction, broadcastStates, getProcessingTimeService()); } @Override @@ -114,6 +115,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> private class ReadWriteContextImpl extends BaseBroadcastProcessFunction.Context { + private final ExecutionConfig config; + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; private final ProcessingTimeService timerService; @@ -121,11 +124,13 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> private StreamRecord<IN2> element; ReadWriteContextImpl( + final ExecutionConfig executionConfig, final BroadcastProcessFunction<IN1, IN2, OUT> function, final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, final ProcessingTimeService timerService) { function.super(); + this.config = Preconditions.checkNotNull(executionConfig); this.states = Preconditions.checkNotNull(broadcastStates); this.timerService = Preconditions.checkNotNull(timerService); } @@ -143,6 +148,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> @Override public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { Preconditions.checkNotNull(stateDescriptor); + + stateDescriptor.initializeSerializerUnlessSet(config); BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor); if (state == null) { throw new IllegalArgumentException("The requested state does not exist. " + @@ -171,6 +178,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> private class ReadOnlyContextImpl extends BroadcastProcessFunction<IN1, IN2, OUT>.ReadOnlyContext { + private final ExecutionConfig config; + private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states; private final ProcessingTimeService timerService; @@ -178,11 +187,13 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> private StreamRecord<IN1> element; ReadOnlyContextImpl( + final ExecutionConfig executionConfig, final BroadcastProcessFunction<IN1, IN2, OUT> function, final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates, final ProcessingTimeService timerService) { function.super(); + this.config = Preconditions.checkNotNull(executionConfig); this.states = Preconditions.checkNotNull(broadcastStates); this.timerService = Preconditions.checkNotNull(timerService); } @@ -216,6 +227,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> @Override public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) { Preconditions.checkNotNull(stateDescriptor); + + stateDescriptor.initializeSerializerUnlessSet(config); ReadOnlyBroadcastState<K, V> state = (ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor); if (state == null) { throw new IllegalArgumentException("The requested state does not exist. " + http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 9170940..3a88829 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -478,9 +478,9 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def broadcast(broadcastStateDescriptors: MapStateDescriptor[_, _]*): BroadcastStream[T] = { if (broadcastStateDescriptors == null) { - throw new NullPointerException("Map function must not be null.") + throw new NullPointerException("State Descriptors must not be null.") } - stream.broadcast(broadcastStateDescriptors: _*) + javaStream.broadcast(broadcastStateDescriptors: _*) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala index af883e1..6c382d5 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala @@ -41,7 +41,7 @@ class BroadcastStateITCase extends AbstractTestBase { val timerTimestamp = 100000L - val DESCRIPTOR = new MapStateDescriptor[Long, String]( + lazy val DESCRIPTOR = new MapStateDescriptor[Long, String]( "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.STRING_TYPE_INFO) @@ -98,7 +98,7 @@ class TestBroadcastProcessFunction( expectedBroadcastState: Map[Long, String]) extends KeyedBroadcastProcessFunction[Long, Long, String, String] { - val localDescriptor = new MapStateDescriptor[Long, String]( + lazy val localDescriptor = new MapStateDescriptor[Long, String]( "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.STRING_TYPE_INFO) http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java index 4b0b9c5..868aca9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java @@ -21,6 +21,7 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; @@ -47,6 +48,10 @@ public class BroadcastStateITCase { @Test public void testConnectWithBroadcastTranslation() throws Exception { + final MapStateDescriptor<Long, String> utterDescriptor = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + final Map<Long, String> expected = new HashMap<>(); expected.put(0L, "test:0"); expected.put(1L, "test:1"); @@ -80,7 +85,7 @@ public class BroadcastStateITCase { } }); - final BroadcastStream<String> broadcast = srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR); + final BroadcastStream<String> broadcast = srcTwo.broadcast(utterDescriptor); // the timestamp should be high enough to trigger the timer after all the elements arrive. final DataStream<String> output = srcOne.connect(broadcast).process( @@ -143,9 +148,7 @@ public class BroadcastStateITCase { private final long timerTimestamp; - static final MapStateDescriptor<Long, String> DESCRIPTOR = new MapStateDescriptor<>( - "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO - ); + private transient MapStateDescriptor<Long, String> descriptor; TestBroadcastProcessFunction( final long timerTS, @@ -156,6 +159,15 @@ public class BroadcastStateITCase { } @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + descriptor = new MapStateDescriptor<>( + "broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO + ); + } + + @Override public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception { ctx.timerService().registerEventTimeTimer(timerTimestamp); } @@ -163,7 +175,7 @@ public class BroadcastStateITCase { @Override public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception { long key = Long.parseLong(value.split(":")[1]); - ctx.getBroadcastState(DESCRIPTOR).put(key, value); + ctx.getBroadcastState(descriptor).put(key, value); } @Override @@ -171,7 +183,7 @@ public class BroadcastStateITCase { Assert.assertEquals(timerTimestamp, timestamp); Map<Long, String> map = new HashMap<>(); - for (Map.Entry<Long, String> entry : ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) { + for (Map.Entry<Long, String> entry : ctx.getBroadcastState(descriptor).immutableEntries()) { map.put(entry.getKey(), entry.getValue()); }