Repository: flink
Updated Branches:
  refs/heads/master 528cca5cf -> 010f44c71


[FLINK-8642] Initialize descriptors before use at getBroadcastState().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cd6fe1c8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cd6fe1c8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cd6fe1c8

Branch: refs/heads/master
Commit: cd6fe1c84fa97208a3cfd0c02be41c9799ba973a
Parents: 528cca5
Author: kkloudas <kklou...@gmail.com>
Authored: Tue Feb 13 11:46:02 2018 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Tue Feb 13 17:32:33 2018 +0100

----------------------------------------------------------------------
 .../examples/broadcast/BroadcastExample.scala   |  5 ++--
 .../co/CoBroadcastWithKeyedOperator.java        | 25 +++++++++++++++++---
 .../co/CoBroadcastWithNonKeyedOperator.java     | 17 +++++++++++--
 .../flink/streaming/api/scala/DataStream.scala  |  4 ++--
 .../api/scala/BroadcastStateITCase.scala        |  4 ++--
 .../streaming/runtime/BroadcastStateITCase.java | 24 ++++++++++++++-----
 6 files changed, 62 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala
----------------------------------------------------------------------
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala
index fff3c39..d158738 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/broadcast/BroadcastExample.scala
@@ -68,9 +68,10 @@ object BroadcastExample {
       .connect(broadcastStream)
       .process(new KeyedBroadcastProcessFunction[Int, (Int, Int), Int, 
String]() {
 
-        val valueState = new ValueStateDescriptor[String]("any", 
BasicTypeInfo.STRING_TYPE_INFO)
+        private lazy val valueState = new ValueStateDescriptor[String](
+          "any", BasicTypeInfo.STRING_TYPE_INFO)
 
-        val mapStateDesc = new MapStateDescriptor[String, Integer](
+        private lazy val mapStateDesc = new MapStateDescriptor[String, 
Integer](
           "Broadcast", BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO)
 
         @throws[Exception]

http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
index 4872c61..2bdb683 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
@@ -99,9 +100,9 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
                        broadcastStates.put(descriptor, 
getOperatorStateBackend().getBroadcastState(descriptor));
                }
 
-               rwContext = new ReadWriteContextImpl(getKeyedStateBackend(), 
userFunction, broadcastStates, timerService);
-               rContext = new ReadOnlyContextImpl(userFunction, 
broadcastStates, timerService);
-               onTimerContext = new OnTimerContextImpl(userFunction, 
broadcastStates, timerService);
+               rwContext = new ReadWriteContextImpl(getExecutionConfig(), 
getKeyedStateBackend(), userFunction, broadcastStates, timerService);
+               rContext = new ReadOnlyContextImpl(getExecutionConfig(), 
userFunction, broadcastStates, timerService);
+               onTimerContext = new OnTimerContextImpl(getExecutionConfig(), 
userFunction, broadcastStates, timerService);
        }
 
        @Override
@@ -142,6 +143,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 
        private class ReadWriteContextImpl extends 
KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext {
 
+               private final ExecutionConfig config;
+
                private final KeyedStateBackend<KS> keyedStateBackend;
 
                private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
@@ -151,12 +154,14 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, 
OUT>
                private StreamRecord<IN2> element;
 
                ReadWriteContextImpl (
+                               final ExecutionConfig executionConfig,
                                final KeyedStateBackend<KS> keyedStateBackend,
                                final KeyedBroadcastProcessFunction<KS, IN1, 
IN2, OUT> function,
                                final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
                                final TimerService timerService) {
 
                        function.super();
+                       this.config = 
Preconditions.checkNotNull(executionConfig);
                        this.keyedStateBackend = 
Preconditions.checkNotNull(keyedStateBackend);
                        this.states = 
Preconditions.checkNotNull(broadcastStates);
                        this.timerService = 
Preconditions.checkNotNull(timerService);
@@ -175,6 +180,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
                @Override
                public <K, V> BroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
                        Preconditions.checkNotNull(stateDescriptor);
+
+                       stateDescriptor.initializeSerializerUnlessSet(config);
                        BroadcastState<K, V> state = (BroadcastState<K, V>) 
states.get(stateDescriptor);
                        if (state == null) {
                                throw new IllegalArgumentException("The 
requested state does not exist. " +
@@ -215,6 +222,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 
        private class ReadOnlyContextImpl extends 
KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext {
 
+               private final ExecutionConfig config;
+
                private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
 
                private final TimerService timerService;
@@ -222,11 +231,13 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, 
OUT>
                private StreamRecord<IN1> element;
 
                ReadOnlyContextImpl(
+                               final ExecutionConfig executionConfig,
                                final KeyedBroadcastProcessFunction<KS, IN1, 
IN2, OUT> function,
                                final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
                                final TimerService timerService) {
 
                        function.super();
+                       this.config = 
Preconditions.checkNotNull(executionConfig);
                        this.states = 
Preconditions.checkNotNull(broadcastStates);
                        this.timerService = 
Preconditions.checkNotNull(timerService);
                }
@@ -265,6 +276,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
                @Override
                public  <K, V> ReadOnlyBroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
                        Preconditions.checkNotNull(stateDescriptor);
+
+                       stateDescriptor.initializeSerializerUnlessSet(config);
                        ReadOnlyBroadcastState<K, V> state = 
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
                        if (state == null) {
                                throw new IllegalArgumentException("The 
requested state does not exist. " +
@@ -277,6 +290,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 
        private class OnTimerContextImpl extends 
KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext {
 
+               private final ExecutionConfig config;
+
                private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
 
                private final TimerService timerService;
@@ -286,11 +301,13 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, 
OUT>
                private InternalTimer<KS, VoidNamespace> timer;
 
                OnTimerContextImpl(
+                               final ExecutionConfig executionConfig,
                                final KeyedBroadcastProcessFunction<KS, IN1, 
IN2, OUT> function,
                                final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
                                final TimerService timerService) {
 
                        function.super();
+                       this.config = 
Preconditions.checkNotNull(executionConfig);
                        this.states = 
Preconditions.checkNotNull(broadcastStates);
                        this.timerService = 
Preconditions.checkNotNull(timerService);
                }
@@ -331,6 +348,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
                @Override
                public <K, V> ReadOnlyBroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
                        Preconditions.checkNotNull(stateDescriptor);
+
+                       stateDescriptor.initializeSerializerUnlessSet(config);
                        ReadOnlyBroadcastState<K, V> state = 
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
                        if (state == null) {
                                throw new IllegalArgumentException("The 
requested state does not exist. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
index 25bf873..7e1e431 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
@@ -86,8 +87,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
                        broadcastStates.put(descriptor, 
getOperatorStateBackend().getBroadcastState(descriptor));
                }
 
-               rwContext = new ReadWriteContextImpl(userFunction, 
broadcastStates, getProcessingTimeService());
-               rContext = new ReadOnlyContextImpl(userFunction, 
broadcastStates, getProcessingTimeService());
+               rwContext = new ReadWriteContextImpl(getExecutionConfig(), 
userFunction, broadcastStates, getProcessingTimeService());
+               rContext = new ReadOnlyContextImpl(getExecutionConfig(), 
userFunction, broadcastStates, getProcessingTimeService());
        }
 
        @Override
@@ -114,6 +115,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
 
        private class ReadWriteContextImpl extends 
BaseBroadcastProcessFunction.Context {
 
+               private final ExecutionConfig config;
+
                private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
 
                private final ProcessingTimeService timerService;
@@ -121,11 +124,13 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, 
OUT>
                private StreamRecord<IN2> element;
 
                ReadWriteContextImpl(
+                               final ExecutionConfig executionConfig,
                                final BroadcastProcessFunction<IN1, IN2, OUT> 
function,
                                final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
                                final ProcessingTimeService timerService) {
 
                        function.super();
+                       this.config = 
Preconditions.checkNotNull(executionConfig);
                        this.states = 
Preconditions.checkNotNull(broadcastStates);
                        this.timerService = 
Preconditions.checkNotNull(timerService);
                }
@@ -143,6 +148,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
                @Override
                public <K, V> BroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
                        Preconditions.checkNotNull(stateDescriptor);
+
+                       stateDescriptor.initializeSerializerUnlessSet(config);
                        BroadcastState<K, V> state = (BroadcastState<K, V>) 
states.get(stateDescriptor);
                        if (state == null) {
                                throw new IllegalArgumentException("The 
requested state does not exist. " +
@@ -171,6 +178,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
 
        private class ReadOnlyContextImpl extends BroadcastProcessFunction<IN1, 
IN2, OUT>.ReadOnlyContext {
 
+               private final ExecutionConfig config;
+
                private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, 
?>> states;
 
                private final ProcessingTimeService timerService;
@@ -178,11 +187,13 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, 
OUT>
                private StreamRecord<IN1> element;
 
                ReadOnlyContextImpl(
+                               final ExecutionConfig executionConfig,
                                final BroadcastProcessFunction<IN1, IN2, OUT> 
function,
                                final Map<MapStateDescriptor<?, ?>, 
BroadcastState<?, ?>> broadcastStates,
                                final ProcessingTimeService timerService) {
 
                        function.super();
+                       this.config = 
Preconditions.checkNotNull(executionConfig);
                        this.states = 
Preconditions.checkNotNull(broadcastStates);
                        this.timerService = 
Preconditions.checkNotNull(timerService);
                }
@@ -216,6 +227,8 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
                @Override
                public <K, V> ReadOnlyBroadcastState<K, V> 
getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) {
                        Preconditions.checkNotNull(stateDescriptor);
+
+                       stateDescriptor.initializeSerializerUnlessSet(config);
                        ReadOnlyBroadcastState<K, V> state = 
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
                        if (state == null) {
                                throw new IllegalArgumentException("The 
requested state does not exist. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 9170940..3a88829 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -478,9 +478,9 @@ class DataStream[T](stream: JavaStream[T]) {
   @PublicEvolving
   def broadcast(broadcastStateDescriptors: MapStateDescriptor[_, _]*): 
BroadcastStream[T] = {
     if (broadcastStateDescriptors == null) {
-      throw new NullPointerException("Map function must not be null.")
+      throw new NullPointerException("State Descriptors must not be null.")
     }
-    stream.broadcast(broadcastStateDescriptors: _*)
+    javaStream.broadcast(broadcastStateDescriptors: _*)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
index af883e1..6c382d5 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
@@ -41,7 +41,7 @@ class BroadcastStateITCase extends AbstractTestBase {
 
     val timerTimestamp = 100000L
 
-    val DESCRIPTOR = new MapStateDescriptor[Long, String](
+    lazy val DESCRIPTOR = new MapStateDescriptor[Long, String](
       "broadcast-state",
       BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
       BasicTypeInfo.STRING_TYPE_INFO)
@@ -98,7 +98,7 @@ class TestBroadcastProcessFunction(
         expectedBroadcastState: Map[Long, String])
     extends KeyedBroadcastProcessFunction[Long, Long, String, String] {
 
-  val localDescriptor = new MapStateDescriptor[Long, String](
+  lazy val localDescriptor = new MapStateDescriptor[Long, String](
     "broadcast-state",
     BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
     BasicTypeInfo.STRING_TYPE_INFO)

http://git-wip-us.apache.org/repos/asf/flink/blob/cd6fe1c8/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 4b0b9c5..868aca9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.test.streaming.runtime;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -47,6 +48,10 @@ public class BroadcastStateITCase {
        @Test
        public void testConnectWithBroadcastTranslation() throws Exception {
 
+               final MapStateDescriptor<Long, String> utterDescriptor = new 
MapStateDescriptor<>(
+                               "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+               );
+
                final Map<Long, String> expected = new HashMap<>();
                expected.put(0L, "test:0");
                expected.put(1L, "test:1");
@@ -80,7 +85,7 @@ public class BroadcastStateITCase {
                                        }
                                });
 
-               final BroadcastStream<String> broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+               final BroadcastStream<String> broadcast = 
srcTwo.broadcast(utterDescriptor);
 
                // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
                final DataStream<String> output = 
srcOne.connect(broadcast).process(
@@ -143,9 +148,7 @@ public class BroadcastStateITCase {
 
                private final long timerTimestamp;
 
-               static final MapStateDescriptor<Long, String> DESCRIPTOR = new 
MapStateDescriptor<>(
-                               "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
-               );
+               private transient MapStateDescriptor<Long, String> descriptor;
 
                TestBroadcastProcessFunction(
                                final long timerTS,
@@ -156,6 +159,15 @@ public class BroadcastStateITCase {
                }
 
                @Override
+               public void open(Configuration parameters) throws Exception {
+                       super.open(parameters);
+
+                       descriptor = new MapStateDescriptor<>(
+                                       "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+                       );
+               }
+
+               @Override
                public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector<String> out) throws Exception {
                        
ctx.timerService().registerEventTimeTimer(timerTimestamp);
                }
@@ -163,7 +175,7 @@ public class BroadcastStateITCase {
                @Override
                public void processBroadcastElement(String value, KeyedContext 
ctx, Collector<String> out) throws Exception {
                        long key = Long.parseLong(value.split(":")[1]);
-                       ctx.getBroadcastState(DESCRIPTOR).put(key, value);
+                       ctx.getBroadcastState(descriptor).put(key, value);
                }
 
                @Override
@@ -171,7 +183,7 @@ public class BroadcastStateITCase {
                        Assert.assertEquals(timerTimestamp, timestamp);
 
                        Map<Long, String> map = new HashMap<>();
-                       for (Map.Entry<Long, String> entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
+                       for (Map.Entry<Long, String> entry : 
ctx.getBroadcastState(descriptor).immutableEntries()) {
                                map.put(entry.getKey(), entry.getValue());
                        }
 

Reply via email to