[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-08 Thread kl0u
Github user kl0u closed the pull request at:

https://github.com/apache/flink/pull/5230


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-06 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r166344325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -38,6 +38,37 @@
 */
void setCurrentKey(K newKey);
 
+   /**
+* Returns a safe version of the current key (see {@link 
#setCurrentKey(Object)}).
+*
+* "Safe" means that the user can interact with it without 
jeopardizing correctness.
+* This implies that:
+* 
+* for the {@code MemoryStateBackend} and the {@code 
FsStateBackend} we
+* return a copy of the actual key, while
+* for the {@code RocksDBStateBackend} we return the key 
itself, as returned
+* by the backend.
+* 
+*
+* The copy is created using the {@link TypeSerializer#copy(Object) 
copy()} method
+* of the key {@link TypeSerializer}. Consequently, the correctness of 
the method assumes
+* a correct {@code copy()} method.
+*
+*/
+   K getCurrentKeySafe();
+
+   /**
+* Applies the provided {@link KeyedStateFunction} to the state with 
the provided
+* {@link StateDescriptor} of all the currently active keys.
+*
+* @param stateDescriptor the descriptor of the state to which the
+*   function is going to be applied.
+* @param function the function to be applied to the keyed state.
+*/
+void applyToAllKeys(
--- End diff --

So the user should provide namespace and namespace serializer? 


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-06 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r166344317
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -38,6 +38,37 @@
 */
void setCurrentKey(K newKey);
 
+   /**
+* Returns a safe version of the current key (see {@link 
#setCurrentKey(Object)}).
+*
+* "Safe" means that the user can interact with it without 
jeopardizing correctness.
+* This implies that:
+* 
+* for the {@code MemoryStateBackend} and the {@code 
FsStateBackend} we
+* return a copy of the actual key, while
+* for the {@code RocksDBStateBackend} we return the key 
itself, as returned
+* by the backend.
+* 
+*
+* The copy is created using the {@link TypeSerializer#copy(Object) 
copy()} method
+* of the key {@link TypeSerializer}. Consequently, the correctness of 
the method assumes
+* a correct {@code copy()} method.
+*
+*/
+   K getCurrentKeySafe();
--- End diff --

Ok I will remove this.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-06 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r166337384
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -38,6 +38,37 @@
 */
void setCurrentKey(K newKey);
 
+   /**
+* Returns a safe version of the current key (see {@link 
#setCurrentKey(Object)}).
+*
+* "Safe" means that the user can interact with it without 
jeopardizing correctness.
+* This implies that:
+* 
+* for the {@code MemoryStateBackend} and the {@code 
FsStateBackend} we
+* return a copy of the actual key, while
+* for the {@code RocksDBStateBackend} we return the key 
itself, as returned
+* by the backend.
+* 
+*
+* The copy is created using the {@link TypeSerializer#copy(Object) 
copy()} method
+* of the key {@link TypeSerializer}. Consequently, the correctness of 
the method assumes
+* a correct {@code copy()} method.
+*
+*/
+   K getCurrentKeySafe();
--- End diff --

I think we don't need this since the methods on broadcast state also don't 
make sure that returned values are safe, i.e. if a user modifies the value they 
break things. In the other parts we document in the Javadoc that users 
shouldn't modify the value, I think we can go down the same route here, which 
makes iteration over all keys cheaper.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-06 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r166337949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -38,6 +38,37 @@
 */
void setCurrentKey(K newKey);
 
+   /**
+* Returns a safe version of the current key (see {@link 
#setCurrentKey(Object)}).
+*
+* "Safe" means that the user can interact with it without 
jeopardizing correctness.
+* This implies that:
+* 
+* for the {@code MemoryStateBackend} and the {@code 
FsStateBackend} we
+* return a copy of the actual key, while
+* for the {@code RocksDBStateBackend} we return the key 
itself, as returned
+* by the backend.
+* 
+*
+* The copy is created using the {@link TypeSerializer#copy(Object) 
copy()} method
+* of the key {@link TypeSerializer}. Consequently, the correctness of 
the method assumes
+* a correct {@code copy()} method.
+*
+*/
+   K getCurrentKeySafe();
+
+   /**
+* Applies the provided {@link KeyedStateFunction} to the state with 
the provided
+* {@link StateDescriptor} of all the currently active keys.
+*
+* @param stateDescriptor the descriptor of the state to which the
+*   function is going to be applied.
+* @param function the function to be applied to the keyed state.
+*/
+void applyToAllKeys(
--- End diff --

I think this method needs to take the namespace. The current implementation 
in `AbstractKeyedStateBackend` always iterates over `VoidNamespace` implicitly, 
which can be a bit surprising.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165399750
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -586,7 +586,7 @@ private StreamGraph 
generateInternal(List transformatio
transform.getOutputType(),
transform.getName());
 
-   if (transform.getStateKeySelector1() != null) {
+   if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
--- End diff --

Does this, in combination with the comment below on the `DataStreamTest` 
mean that you want to move the `DataStreamTest` test in a separate `ITCase`?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165398176
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
}
 
+   @Test
+   public void testConnectWithBroadcastTranslation() throws Exception {
+
+   final Map expected = new HashMap<>();
+   expected.put(0L, "test:0");
+   expected.put(1L, "test:1");
+   expected.put(2L, "test:2");
+   expected.put(3L, "test:3");
+   expected.put(4L, "test:4");
+   expected.put(5L, "test:5");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   final DataStream srcOne = env.generateSequence(0L, 5L)
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+
+   @Override
+   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+   return element;
+   }
+   }).keyBy((KeySelector) value -> 
value);
+
+   final DataStream srcTwo = 
env.fromCollection(expected.values())
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+   @Override
+   public long extractTimestamp(String 
element, long previousElementTimestamp) {
+   return 
Long.parseLong(element.split(":")[1]);
+   }
+   });
+
+   final BroadcastStream broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+   // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+   final DataStream output = 
srcOne.connect(broadcast).process(
+   new TestBroadcastProcessFunction(10L, 
expected));
+
+   output.addSink(new DiscardingSink<>());
+   env.execute();
+   }
+
+   private abstract static class CustomWmEmitter implements 
AssignerWithPunctuatedWatermarks {
+
+   @Nullable
+   @Override
+   public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
+   return new Watermark(extractedTimestamp);
+   }
+   }
+
+   private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction {
+
+   private final Map expectedState;
+
+   private final long timerTimestamp;
+
+   static final MapStateDescriptor DESCRIPTOR = new 
MapStateDescriptor<>(
+   "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+   );
+
+   TestBroadcastProcessFunction(
+   final long timerTS,
+   final Map expectedBroadcastState
+   ) {
+   expectedState = expectedBroadcastState;
+   timerTimestamp = timerTS;
+   }
+
+   @Override
+   public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector out) throws Exception {
+   
ctx.timerService().registerEventTimeTimer(timerTimestamp);
+   }
+
+   @Override
+   public void processBroadcastElement(String value, 
KeyedReadWriteContext ctx, Collector out) throws Exception {
+   long key = Long.parseLong(value.split(":")[1]);
+   ctx.getBroadcastState(DESCRIPTOR).put(key, value);
+   }
+
+   @Override
+   public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
+   Map map = new HashMap<>();
+   for (Map.Entry entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
+   map.put(entry.getKey(), entry.getValue());
+   }
+   Assert.assertEquals(expectedState, map);
+   }
+   }
+
+   /**
+* Tests that with a {@link KeyedStream} we have to provide a {@link 
KeyedBroadcastProcessFunction}.
+*/
+   

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165395176
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -586,7 +586,7 @@ private StreamGraph 
generateInternal(List transformatio
transform.getOutputType(),
transform.getName());
 
-   if (transform.getStateKeySelector1() != null) {
+   if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
--- End diff --

I think we need some test that using keyed state actually works when only 
one of the inputs is keyed. I think we need an `ITCase` for that somewhere.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165397632
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
}
 
+   @Test
+   public void testConnectWithBroadcastTranslation() throws Exception {
+
+   final Map expected = new HashMap<>();
+   expected.put(0L, "test:0");
+   expected.put(1L, "test:1");
+   expected.put(2L, "test:2");
+   expected.put(3L, "test:3");
+   expected.put(4L, "test:4");
+   expected.put(5L, "test:5");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   final DataStream srcOne = env.generateSequence(0L, 5L)
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+
+   @Override
+   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+   return element;
+   }
+   }).keyBy((KeySelector) value -> 
value);
+
+   final DataStream srcTwo = 
env.fromCollection(expected.values())
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+   @Override
+   public long extractTimestamp(String 
element, long previousElementTimestamp) {
+   return 
Long.parseLong(element.split(":")[1]);
+   }
+   });
+
+   final BroadcastStream broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+   // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+   final DataStream output = 
srcOne.connect(broadcast).process(
+   new TestBroadcastProcessFunction(10L, 
expected));
+
+   output.addSink(new DiscardingSink<>());
+   env.execute();
--- End diff --

So far, all tests in this are purely translation tests. I mentioned this in 
another comment, that it would be good to have an ITCase that actually verifies 
that using keyed state works and that the other features work as well in a 
complete program. Have a look at `SideOutputITCase`, for example. 👍 


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165394684
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.api.common.state.ReadWriteBroadcastState;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * The base class containing the functionality available to all broadcast 
process function.
+ * These include the {@link BroadcastProcessFunction} and the {@link 
KeyedBroadcastProcessFunction}.
+ */
+@PublicEvolving
+public abstract class BaseBroadcastProcessFunction extends 
AbstractRichFunction {
+
+   private static final long serialVersionUID = -131631008887478610L;
+
+   /**
+* The base context available to all methods in a broadcast process 
function. This
+* include {@link BroadcastProcessFunction BroadcastProcessFunctions} 
and
+* {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+*/
+   abstract class Context {
+
+   /**
+* Timestamp of the element currently being processed or 
timestamp of a firing timer.
+*
+* This might be {@code null}, for example if the time 
characteristic of your program
+* is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+*/
+   public abstract Long timestamp();
+
+   /**
+* Emits a record to the side output identified by the {@link 
OutputTag}.
+*
+* @param outputTag the {@code OutputTag} that identifies the 
side output to emit to.
+* @param value The record to emit.
+*/
+   public abstract  void output(OutputTag outputTag, X 
value);
+
+   /** Returns the current processing time. */
+   public abstract long currentProcessingTime();
+
+   /** Returns the current event-time watermark. */
+   public abstract long currentWatermark();
+   }
+
+   /**
+* A base {@link Context context} available to the broadcasted stream 
side of
+* a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream}.
+*
+* Apart from the basic functionality of a {@link Context context},
+* this also allows to get and update the elements stored in the
+* {@link ReadWriteBroadcastState broadcast state}.
+* In other words, it gives read/write access to the broadcast state.
+*/
+   public abstract class ReadWriteContext extends Context {
--- End diff --

I think this could be called `Context`, similar to my comment on 
`ReadWriteBroadcastState`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165395860
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.api.common.state.ReadWriteBroadcastState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TwoInputStreamOperator} for executing {@link 
KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+ *
+ * @param  The key type of the input keyed stream.
+ * @param  The input type of the keyed (non-broadcast) side.
+ * @param  The input type of the broadcast side.
+ * @param  The output type of the operator.
+ */
+@Internal
+public class CoBroadcastWithKeyedOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator, 
Triggerable {
+
+   private static final long serialVersionUID = 5926499536290284870L;
+
+   private final List broadcastStateDescriptors;
+
+   private transient TimestampedCollector collector;
+
+   private transient Map broadcastStates;
+
+   private transient ReadWriteContextImpl rwContext;
+
+   private transient ReadOnlyContextImpl rContext;
+
+   private transient OnTimerContextImpl onTimerContext;
+
+   public CoBroadcastWithKeyedOperator(
+   final KeyedBroadcastProcessFunction 
function,
+   final List 
broadcastStateDescriptors) {
+   super(function);
+   this.broadcastStateDescriptors = 
Preconditions.checkNotNull(broadcastStateDescriptors);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   InternalTimerService internalTimerService =
+   getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
+
+   TimerService timerService = new 
SimpleTimerService(internalTimerService);
+
+   collector = new TimestampedCollector<>(output);
+

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165387915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -57,18 +63,25 @@ public int getVersion() {
@Override
public int[] getCompatibleVersions() {
// we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
-   return new int[] {VERSION, 1};
+   return new int[] {VERSION, 2, 1};
--- End diff --

This should probably now say "we are compatible with versions 3, 2, and 1".


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165385868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -148,21 +170,27 @@ public void close() throws IOException {
@Override
public void dispose() {
IOUtils.closeQuietly(closeStreamOnCancelRegistry);
-   registeredStates.clear();
+   registeredOperatorStates.clear();
+   registeredBroadcastStates.clear();
}
 
// 
---
//  State access methods
// 
---
 
+   @Override
+   public  ReadWriteBroadcastState 
getBroadcastState(MapStateDescriptor stateDescriptor) throws Exception {
--- End diff --

I think we don't need this because the other `getBroadcastState()` is only 
ever called with `BROADCAST` mode.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165388777
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,33 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   if (VERSION >= 3) {
--- End diff --

Wouldn't this always depend on the version of the code and not the version 
of the snapshot? That is, if we restore from a `VERSION < 3` snapshot we should 
not go into this code path.

I think here you can get that via `getReadVersion()`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165378004
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ReadWriteBroadcastState.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A type of state that can be created to store the state of a {@code 
BroadcastStream}. This state assumes that
+ * the same elements are sent to all instances of an operator.
+ *
+ * CAUTION: the user has to guarantee that all task instances 
store the same elements in this type of state.
+ *
+ *  Each operator instance individually maintains and stores elements 
in the broadcast state. The fact that the
+ * incoming stream is a broadcast one guarantees that all instances see 
all the elements. Upon recovery
+ * or re-scaling, the same state is given to each of the instances. To 
avoid hotspots, each task reads its previous
+ * partition, and if there are more tasks (scale up), then the new 
instances read from the old instances in a round
+ * robin fashion. This is why each instance has to guarantee that it 
stores the same elements as the rest. If not,
+ * upon recovery or rescaling you may have unpredictable redistribution of 
the partitions, thus unpredictable results.
+ *
+ * @param  The key type of the elements in the {@link 
ReadWriteBroadcastState}.
+ * @param  The value type of the elements in the {@link 
ReadWriteBroadcastState}.
+ */
+@PublicEvolving
+public interface ReadWriteBroadcastState extends 
ReadOnlyBroadcastState {
--- End diff --

I think we can call this just `BroadcastState`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165377821
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * A read-only type of state that gives read-only access to the state of a 
{@code BroadcastStream}
--- End diff --

I think the `access to the state of a BroadCast` stream bit might be a bit 
confusing because it ties it too heavily to broadcast streams.

We should also have a big warning here that you should not modify the 
result of `get()` and also that you should not modify the entries in the 
immutable iterator because this would lead to problems with the heap state 
backend.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163938277
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+public class RegisteredBroadcastBackendStateMetaInfo {
+
+   /** The name of the state, as registered by the user. */
+   private final String name;
+
+   /** The mode how elements in this state are assigned to tasks during 
restore. */
+   private final OperatorStateHandle.Mode assignmentMode;
+
+   /** The type serializer for the keys in the map state. */
+   private final TypeSerializer keySerializer;
+
+   /** The type serializer for the values in the map state. */
+   private final TypeSerializer valueSerializer;
+
+   public RegisteredBroadcastBackendStateMetaInfo(
+   final String name,
+   final OperatorStateHandle.Mode assignmentMode,
+   final TypeSerializer keySerializer,
+   final TypeSerializer valueSerializer) {
+
+   Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+   this.name = Preconditions.checkNotNull(name);
+   this.assignmentMode = assignmentMode;
+   this.keySerializer = Preconditions.checkNotNull(keySerializer);
+   this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   public TypeSerializer getValueSerializer() {
+   return valueSerializer;
+   }
+
+   public OperatorStateHandle.Mode getAssignmentMode() {
+   return assignmentMode;
+   }
+
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot() {
+   return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
+   name,
+   assignmentMode,
+   keySerializer.duplicate(),
+   valueSerializer.duplicate(),
+   keySerializer.snapshotConfiguration(),
+   valueSerializer.snapshotConfiguration());
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   }
+
+   if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
+   return false;
+   }
+
+   final RegisteredBroadcastBackendStateMetaInfo other =
+   (RegisteredBroadcastBackendStateMetaInfo) obj;
+
+   return Objects.equals(name, other.getName())
+   && Objects.equals(assignmentMode, 
other.getAssignmentMode())
+   && Objects.equals(keySerializer, 
other.getKeySerializer())
+   && Objects.equals(valueSerializer, 
other.getValueSerializer());
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name.hashCode();
+   result = 31 * result + assignmentMode.hashCode();
+   result = 31 * result + keySerializer.hashCode();
+   result = 31 * result + valueSerializer.hashCode();
+   return result;
+   }
+
+   @Override
+   public String toString() {
+   return 

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163933180
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

This here (and onwards) would fail if we're reading older version 
savepoints, because there was nothing written for this before.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163936390
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -36,8 +36,9 @@
 * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
 */
public enum Mode {
-   SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
-   BROADCAST // The operator state partitions are broadcast to all 
task.
+   SPLIT_DISTRIBUTE,   // The operator state partitions in the 
state handle are split and distributed to one task each.
+   BROADCAST,  // The operator state 
partitions are broadcasted to all tasks.
+   UNIFORM_BROADCAST   // The operator states are identical, 
and they are broadcasted to all tasks.
--- End diff --

nit: can we either keep with spaces here, or at least tab them so that the 
3 comments are aligned?



---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163938102
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/RegisteredBroadcastBackendStateMetaInfo.java
 ---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+public class RegisteredBroadcastBackendStateMetaInfo {
+
+   /** The name of the state, as registered by the user. */
+   private final String name;
+
+   /** The mode how elements in this state are assigned to tasks during 
restore. */
+   private final OperatorStateHandle.Mode assignmentMode;
+
+   /** The type serializer for the keys in the map state. */
+   private final TypeSerializer keySerializer;
+
+   /** The type serializer for the values in the map state. */
+   private final TypeSerializer valueSerializer;
+
+   public RegisteredBroadcastBackendStateMetaInfo(
+   final String name,
+   final OperatorStateHandle.Mode assignmentMode,
+   final TypeSerializer keySerializer,
+   final TypeSerializer valueSerializer) {
+
+   Preconditions.checkArgument(assignmentMode != null && 
assignmentMode == OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+   this.name = Preconditions.checkNotNull(name);
+   this.assignmentMode = assignmentMode;
+   this.keySerializer = Preconditions.checkNotNull(keySerializer);
+   this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer);
+   }
+
+   public String getName() {
+   return name;
+   }
+
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   public TypeSerializer getValueSerializer() {
+   return valueSerializer;
+   }
+
+   public OperatorStateHandle.Mode getAssignmentMode() {
+   return assignmentMode;
+   }
+
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
snapshot() {
+   return new RegisteredBroadcastBackendStateMetaInfo.Snapshot<>(
+   name,
+   assignmentMode,
+   keySerializer.duplicate(),
+   valueSerializer.duplicate(),
+   keySerializer.snapshotConfiguration(),
+   valueSerializer.snapshotConfiguration());
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+   if (obj == this) {
+   return true;
+   }
+
+   if (!(obj instanceof RegisteredBroadcastBackendStateMetaInfo)) {
+   return false;
+   }
+
+   final RegisteredBroadcastBackendStateMetaInfo other =
+   (RegisteredBroadcastBackendStateMetaInfo) obj;
+
+   return Objects.equals(name, other.getName())
+   && Objects.equals(assignmentMode, 
other.getAssignmentMode())
+   && Objects.equals(keySerializer, 
other.getKeySerializer())
+   && Objects.equals(valueSerializer, 
other.getValueSerializer());
+   }
+
+   @Override
+   public int hashCode() {
+   int result = name.hashCode();
+   result = 31 * result + assignmentMode.hashCode();
+   result = 31 * result + keySerializer.hashCode();
+   result = 31 * result + valueSerializer.hashCode();
+   return result;
+   }
+
+   @Override
+   public String toString() {
+   return 

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163935065
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

One straightforward way to fix this is, is to uptick the current `VERSION` 
to 3, and here you do:

```
if (getReadVersion() >= 3) {
// read broadcast state stuff
}
```

so we only try to read broadcast state stuff if the written version in the 
savepoint is larger or equal to 3.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163935632
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 ---
@@ -211,4 +297,34 @@ public 
OperatorBackendStateMetaInfoReaderV2(ClassLoader userCodeClassLoader) {
return stateMetaInfo;
}
}
+
+   public static class BroadcastStateMetaInfoReaderV2 extends 
AbstractBroadcastStateMetaInfoReader {
+
+   public BroadcastStateMetaInfoReaderV2(final ClassLoader 
userCodeClassLoader) {
+   super(userCodeClassLoader);
+   }
+
+   @Override
+   public RegisteredBroadcastBackendStateMetaInfo.Snapshot 
readBroadcastStateMetaInfo(final DataInputView in) throws IOException {
+   RegisteredBroadcastBackendStateMetaInfo.Snapshot 
stateMetaInfo =
+   new 
RegisteredBroadcastBackendStateMetaInfo.Snapshot<>();
+
+   stateMetaInfo.setName(in.readUTF());
+   
stateMetaInfo.setAssignmentMode(OperatorStateHandle.Mode.values()[in.readByte()]);
+
+   Tuple2 
keySerializerAndConfig =
+   
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, 
userCodeClassLoader).get(0);
--- End diff --

If the `writeSerializersAndConfigsWithResilience` call was a single one in 
the writer, then here you can also just get all written serializers and configs 
with a single `readSerializersAndConfigsWithResilience`.
The returned list would be length 2 (order of the key / value serializer + 
config will be the same as how you wrote them).


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932401
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -64,11 +70,18 @@ public int getVersion() {
public void write(DataOutputView out) throws IOException {
super.write(out);
 
-   out.writeShort(stateMetaInfoSnapshots.size());
-   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: stateMetaInfoSnapshots) {
+   out.writeShort(operatorStateMetaInfoSnapshots.size());
+   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: operatorStateMetaInfoSnapshots) {
+   OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateWriterForVersion(VERSION, kvState)
+   .writeOperatorStateMetaInfo(out);
+   }
+
+   out.writeShort(broadcastStateMetaInfoSnapshots.size());
+   for (RegisteredBroadcastBackendStateMetaInfo.Snapshot 
kvState : broadcastStateMetaInfoSnapshots) {
--- End diff --

same here: the naming of the `kvState` variable here is actually a bit odd


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163942745
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,29 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   int numBroadcastStates = in.readShort();
--- End diff --

I think somehow the migration test cases are not failing here, only because 
`in.readShort()` happens to return 0.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932956
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -35,18 +35,24 @@
 
public static final int VERSION = 2;
--- End diff --

It seems like the `OperatorBackendSerializationProxy` will have new binary 
formats after this change.
This should then have an uptick in the VERSION.

In general, I think the PR currently does not have any migration paths for 
previous versions (where there is no broadcast state meta info written).


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163934364
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendStateMetaInfoSnapshotReaderWriters.java
 ---
@@ -112,13 +142,40 @@ public void writeStateMetaInfo(DataOutputView out) 
throws IOException {
}
}
 
+   public static class BroadcastStateMetaInfoWriterV2 extends 
AbstractBroadcastStateMetaInfoWriter {
+
+   public BroadcastStateMetaInfoWriterV2(
+   final 
RegisteredBroadcastBackendStateMetaInfo.Snapshot broadcastStateMetaInfo) {
+   super(broadcastStateMetaInfo);
+   }
+
+   @Override
+   public void writeBroadcastStateMetaInfo(final DataOutputView 
out) throws IOException {
+   out.writeUTF(broadcastStateMetaInfo.getName());
+   
out.writeByte(broadcastStateMetaInfo.getAssignmentMode().ordinal());
+
+   // write in a way that allows us to be fault-tolerant 
and skip blocks in the case of java serialization failures
+   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
+   out,
+   Collections.singletonList(new Tuple2<>(
+   
broadcastStateMetaInfo.getKeySerializer(),
+   
broadcastStateMetaInfo.getKeySerializerConfigSnapshot(;
+
+   
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
--- End diff --

Combining these two `writeSerializersAndConfigsWithResilience` calls into 
one call, with a single list containing both the key serializer and value 
serializer, would be more space-efficient in the written data:

```
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(
out,
Arrays.asList(
Tuple2.of(keySerializer, keySerializerConfig),
Tuple2.of(valueSerializer, valueSerializerConfig));
```


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163930707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -137,7 +155,12 @@ public ExecutionConfig getExecutionConfig() {
 
@Override
public Set getRegisteredStateNames() {
-   return registeredStates.keySet();
+   Set stateNames = new HashSet<>(
--- End diff --

Might not make sense to have a new `HashSet` every time 
`getRegisteredStateNames` is called.
OTOH, would it make sense to have a separate 
`getRegisteredBroadcastStateNames` on the interface?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163932340
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -64,11 +70,18 @@ public int getVersion() {
public void write(DataOutputView out) throws IOException {
super.write(out);
 
-   out.writeShort(stateMetaInfoSnapshots.size());
-   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: stateMetaInfoSnapshots) {
+   out.writeShort(operatorStateMetaInfoSnapshots.size());
+   for (RegisteredOperatorBackendStateMetaInfo.Snapshot kvState 
: operatorStateMetaInfoSnapshots) {
--- End diff --

the naming of the `kvState` variable here is actually a bit odd


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163936903
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
 ---
@@ -36,8 +36,9 @@
 * The modes that determine how an {@link OperatorStateHandle} is 
assigned to tasks during restore.
 */
public enum Mode {
-   SPLIT_DISTRIBUTE, // The operator state partitions in the state 
handle are split and distributed to one task each.
-   BROADCAST // The operator state partitions are broadcast to all 
task.
+   SPLIT_DISTRIBUTE,   // The operator state partitions in the 
state handle are split and distributed to one task each.
+   BROADCAST,  // The operator state 
partitions are broadcasted to all tasks.
--- End diff --

Maybe naming this mode `BROADCAST` was not ideal in the first place 
(perhaps `UNION`, to correspond to the API name, would be better). 
Looking at the name / comments alone between `BROADCAST` and 
`UNIFORM_BROADCAST` is actually quite confusing.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163872816
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+
+/**
+ * An interface with methods related to the interplay between the {@link 
BroadcastState}
+ * and the {@link OperatorStateBackend}.
+ *
+ * @param  The key type of the elements in the {@link BroadcastState}.
+ * @param  The value type of the elements in the {@link BroadcastState}.
+ */
+public interface BackendWritableBroadcastState extends 
BroadcastState {
+
+   BackendWritableBroadcastState deepCopy();
+
+   long[] write(FSDataOutputStream out) throws IOException;
--- End diff --

Here I believe we could return just the offset of the first element, as we 
treat the whole block containing the broadcast state as a single element. It is 
true that even this, it is not strictly needed but I left it for compatibility 
with previous code. 

Is this what you meant?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163870691
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java 
---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * A type of state that can be created to store the state of a {@code 
BroadcastStream}. This state assumes that
+ * the same elements are sent to all instances of an operator.
+ *
+ * CAUTION: the user has to guarantee that all task instances 
store the same elements in this type of state.
+ *
+ *  Each operator instance individually maintains and stores elements 
in the broadcast state. The fact that the
+ * incoming stream is a broadcast one guarantees that all instances see 
all the elements. Upon recovery
+ * or re-scaling, the same state is given to each of the instances. To 
avoid hotspots, each task reads its previous
+ * partition, and if there are more tasks (scale up), then the new 
instances read from the old instances in a round
+ * robin fashion. This is why each instance has to guarantee that it 
stores the same elements as the rest. If not,
+ * upon recovery or rescaling you may have unpredictable redistribution of 
the partitions, thus unpredictable results.
+ *
+ * @param  The key type of the elements in the {@link BroadcastState}.
+ * @param  The value type of the elements in the {@link BroadcastState}.
+ */
+@PublicEvolving
+public interface BroadcastState extends MapState {
+
+   /**
+* @return An {@link Iterable} over the entries in the state with 
read-only access.
+*/
+   Iterable> unmodifiableEntriesIt() throws Exception;
--- End diff --

Done.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163870666
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -513,17 +630,100 @@ public void addAll(List values) throws Exception {
}
}
 
+   private  BroadcastState getBroadcastState(
+   final MapStateDescriptor stateDescriptor,
+   final OperatorStateHandle.Mode mode) throws 
StateMigrationException {
+
+   Preconditions.checkNotNull(stateDescriptor);
+   String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
+
+   @SuppressWarnings("unchecked")
+   BackendWritableBroadcastState previous = 
(BackendWritableBroadcastState) accessedBroadcastStatesByName.get(name);
+   if (previous != null) {
+   checkStateNameAndMode(
+   previous.getStateMetaInfo().getName(),
+   name,
+   
previous.getStateMetaInfo().getAssignmentMode(),
+   mode);
+   return previous;
+   }
+
+   
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+   TypeSerializer broadcastStateKeySerializer = 
Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
+   TypeSerializer broadcastStateValueSerializer = 
Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
+
+   BackendWritableBroadcastState broadcastState = 
(BackendWritableBroadcastState) registeredBroadcastStates.get(name);
+
+   if (broadcastState == null) {
+   broadcastState = new HeapBroadcastState<>(
+   new 
RegisteredBroadcastBackendStateMetaInfo<>(
+   name,
+   mode,
+   
broadcastStateKeySerializer,
+   
broadcastStateValueSerializer));
+   registeredBroadcastStates.put(name, broadcastState);
+   } else {
--- End diff --

No, because we have the `accessedBroadcastStatesByName.get(name)` above 
(line 641). 

As soon as we create or restore the broadcast state, we put it there (line 
708). The next time we will try to access it, we will hit the cache 
(`accessedBroadcastStatesByName`) so we will not go through the creation/check 
phase.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163841958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/BackendWritableBroadcastState.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+import java.io.IOException;
+
+/**
+ * An interface with methods related to the interplay between the {@link 
BroadcastState}
+ * and the {@link OperatorStateBackend}.
+ *
+ * @param  The key type of the elements in the {@link BroadcastState}.
+ * @param  The value type of the elements in the {@link BroadcastState}.
+ */
+public interface BackendWritableBroadcastState extends 
BroadcastState {
+
+   BackendWritableBroadcastState deepCopy();
+
+   long[] write(FSDataOutputStream out) throws IOException;
--- End diff --

Why does the write method return all the partition offsets? Are they ever 
used for anything? I think this might be leftover from `PartitionableListState`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163850320
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 ---
@@ -753,4 +935,27 @@ private static Environment createMockEnvironment() {

when(env.getUserClassLoader()).thenReturn(OperatorStateBackendTest.class.getClassLoader());
return env;
}
+
+   private  Map retrieveBroadcastState(
--- End diff --

Can this not use the normal restore functionality of the backend?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163849551
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -601,21 +805,43 @@ public void addAll(List values) throws Exception {
}
}
 
+   private static  void deserializeBroadcastStateValues(
+   final BackendWritableBroadcastState 
broadcastStateForName,
+   final FSDataInputStream in,
+   final OperatorStateHandle.StateMetaInfo metaInfo) 
throws Exception {
+
+   if (metaInfo != null) {
+   long[] offsets = metaInfo.getOffsets();
+   if (offsets != null) {
+   DataInputView div = new 
DataInputViewStreamWrapper(in);
+
+   TypeSerializer keySerializer = 
broadcastStateForName.getStateMetaInfo().getKeySerializer();
+   TypeSerializer valueSerializer = 
broadcastStateForName.getStateMetaInfo().getValueSerializer();
+
+   for (long offset : offsets) {
+   in.seek(offset);
--- End diff --

As mentioned in `BackendWritableBroadcastState`, we don't need the offsets, 
then we also don't need to to all the seeking here.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163840020
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -513,17 +630,100 @@ public void addAll(List values) throws Exception {
}
}
 
+   private  BroadcastState getBroadcastState(
+   final MapStateDescriptor stateDescriptor,
+   final OperatorStateHandle.Mode mode) throws 
StateMigrationException {
+
+   Preconditions.checkNotNull(stateDescriptor);
+   String name = 
Preconditions.checkNotNull(stateDescriptor.getName());
+
+   @SuppressWarnings("unchecked")
+   BackendWritableBroadcastState previous = 
(BackendWritableBroadcastState) accessedBroadcastStatesByName.get(name);
+   if (previous != null) {
+   checkStateNameAndMode(
+   previous.getStateMetaInfo().getName(),
+   name,
+   
previous.getStateMetaInfo().getAssignmentMode(),
+   mode);
+   return previous;
+   }
+
+   
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
+   TypeSerializer broadcastStateKeySerializer = 
Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
+   TypeSerializer broadcastStateValueSerializer = 
Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
+
+   BackendWritableBroadcastState broadcastState = 
(BackendWritableBroadcastState) registeredBroadcastStates.get(name);
+
+   if (broadcastState == null) {
+   broadcastState = new HeapBroadcastState<>(
+   new 
RegisteredBroadcastBackendStateMetaInfo<>(
+   name,
+   mode,
+   
broadcastStateKeySerializer,
+   
broadcastStateValueSerializer));
+   registeredBroadcastStates.put(name, broadcastState);
+   } else {
--- End diff --

Does this to the compatibility-check dance every time the state is 
accessed? Might be a bit to much and we could do it only the first time a state 
is accessed after restore.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163839057
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 ---
@@ -228,36 +221,58 @@ private GroupByStateNameResults groupByStateName(
Map>> broadcastNameToState =

nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST);
 
-   for (int i = 0; i < parallelism; ++i) {
+   for (int i = 0; i < newParallelism; ++i) {
 
Map mergeMap = 
mergeMapList.get(i);
 
for (Map.Entry>> e :
broadcastNameToState.entrySet()) {
 
-   List> current = e.getValue();
-
-   for (Tuple2 handleWithMetaInfo : current) {
+   for (Tuple2 handleWithMetaInfo : e.getValue()) {
OperatorStateHandle operatorStateHandle 
= mergeMap.get(handleWithMetaInfo.f0);
if (operatorStateHandle == null) {
-   operatorStateHandle = new 
OperatorStateHandle(
-   new 
HashMap(),
-   
handleWithMetaInfo.f0);
-
+   operatorStateHandle = new 
OperatorStateHandle(new HashMap<>(), handleWithMetaInfo.f0);

mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
}

operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), 
handleWithMetaInfo.f1);
}
}
}
+
+   // Now we also add the state handles marked for broadcast to 
all parallel instances
+   Map>> uniformBroadcastNameToState =
+   
nameToStateByMode.getByMode(OperatorStateHandle.Mode.UNIFORM_BROADCAST);
+
+   for (int i = 0; i < newParallelism; ++i) {
+   // TODO: 11/29/17 should I take into account nulls?
--- End diff --

What about these two `TODOs`?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r163838390
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/BroadcastState.java 
---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * A type of state that can be created to store the state of a {@code 
BroadcastStream}. This state assumes that
+ * the same elements are sent to all instances of an operator.
+ *
+ * CAUTION: the user has to guarantee that all task instances 
store the same elements in this type of state.
+ *
+ *  Each operator instance individually maintains and stores elements 
in the broadcast state. The fact that the
+ * incoming stream is a broadcast one guarantees that all instances see 
all the elements. Upon recovery
+ * or re-scaling, the same state is given to each of the instances. To 
avoid hotspots, each task reads its previous
+ * partition, and if there are more tasks (scale up), then the new 
instances read from the old instances in a round
+ * robin fashion. This is why each instance has to guarantee that it 
stores the same elements as the rest. If not,
+ * upon recovery or rescaling you may have unpredictable redistribution of 
the partitions, thus unpredictable results.
+ *
+ * @param  The key type of the elements in the {@link BroadcastState}.
+ * @param  The value type of the elements in the {@link BroadcastState}.
+ */
+@PublicEvolving
+public interface BroadcastState extends MapState {
+
+   /**
+* @return An {@link Iterable} over the entries in the state with 
read-only access.
+*/
+   Iterable> unmodifiableEntriesIt() throws Exception;
--- End diff --

I would prefer `unmodifiableEntries` or better `immutableEntries` here. And 
I think having a Javadoc comment without an actual body is not typical, maybe 
just go for `Returns an immutable {@link Iterable} over the entries in the 
state.`


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-11 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r160904948
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
+import 
org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.Preconditions;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A BroadcastConnectedStream represents the result of connecting a stream 
with
--- End diff --

You mean simplify the Javadoc or the order of the input types?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159222752
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -586,7 +586,7 @@ private StreamGraph 
generateInternal(List transformatio
transform.getOutputType(),
transform.getName());
 
-   if (transform.getStateKeySelector1() != null) {
+   if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
--- End diff --

Do we still need the check here or will the code work if we set either to 
`null`? I think it does because in the broadcast case we set the second key 
selector to `null`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159224065
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
 ---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function to be applied to a
+ * {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream} that
+ * connects {@link 
org.apache.flink.streaming.api.datastream.BroadcastStream BroadcastStream}, 
i.e. a stream
+ * with broadcast state, with a {@link 
org.apache.flink.streaming.api.datastream.KeyedStream KeyedStream}.
+ *
+ * The stream with the broadcast state can be created using the
+ * {@link 
org.apache.flink.streaming.api.datastream.KeyedStream#broadcast(MapStateDescriptor)
+ * keyedStream.broadcast(MapStateDescriptor)} method.
+ *
+ * The user has to implement two methods:
+ * 
+ * the {@link #processElementOnBroadcastSide(Object, 
KeyedReadWriteContext, Collector)} which will be applied to
+ * each element in the broadcast side
+ *  and the {@link #processElement(Object, KeyedReadOnlyContext, 
Collector)} which will be applied to the
+ * non-broadcasted/keyed side.
+ * 
+ *
+ * The {@code processElementOnBroadcastSide()} takes as an argument 
(among others) a context that allows it to
+ * read/write to the broadcast state and also apply a transformation to 
all (local) keyed states, while the
+ * {@code processElement()} has read-only access to the broadcast state, 
but can read/write to the keyed state and
+ * register timers.
+ *
+ * @param  The key type of the input keyed stream.
+ * @param  The input type of the broadcast side.
+ * @param  The input type of the keyed (non-broadcast) side.
+ * @param  The key type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ * @param  The value type of the elements in the {@link 
org.apache.flink.api.common.state.BroadcastState BroadcastState}.
+ * @param  The output type of the operator.
+ */
+@PublicEvolving
+public abstract class KeyedBroadcastProcessFunction extends BaseBroadcastProcessFunction {
+
+   private static final long serialVersionUID = -2584726797564976453L;
+
+   /**
+* This method is called for each element in the
+* {@link org.apache.flink.streaming.api.datastream.BroadcastStream 
broadcast stream}.
+*
+* It can output zero or more elements using the {@link Collector} 
parameter,
+* query the current processing/event time, and also query and update 
the internal
+* {@link org.apache.flink.api.common.state.BroadcastState 
BroadcastState}. In addition, it
+* can register a {@link KeyedStateFunction function} to be applied to 
all keyed states on
+* the local partition. These can be done through the provided {@link 
ReadWriteContext}.
+* The context is only valid during the invocation of this method, do 
not store it.
+*
+* @param value The stream element.
+* @param ctx A {@link ReadWriteContext} that allows querying the 
timestamp of the element,
+*querying the current processing/event time and updating 
the broadcast state.
+*In addition, it allows the registration of a {@link 
KeyedStateFunction function}
+*to be 

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159215700
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 ---
@@ -42,10 +42,10 @@
@Override
public List repartitionState(
List previousParallelSubtaskStates,
-   int parallelism) {
--- End diff --

Some changes, like variable name and some simplifications are pure 
refactorings. Those could/should go into a separate commit.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159223446
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
 ---
@@ -0,0 +1,734 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import 
org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Function;
+
+/**
+ * Tests for the {@link CoBroadcastWithKeyedOperator}.
+ */
+public class CoBroadcastWithKeyedOperatorTest {
+
+   /** Test the iteration over the keyed state on the broadcast side. */
+   @Test
+   public void testAccessToKeyedStateIt() throws Exception {
+   final List test1content = new ArrayList<>();
+   test1content.add("test1");
+   test1content.add("test1");
+
+   final List test2content = new ArrayList<>();
+   test2content.add("test2");
+   test2content.add("test2");
+   test2content.add("test2");
+   test2content.add("test2");
+
+   final List test3content = new ArrayList<>();
+   test3content.add("test3");
+   test3content.add("test3");
+   test3content.add("test3");
+
+   final Map expectedState = new HashMap<>();
+   expectedState.put("test1", test1content);
+   expectedState.put("test2", test2content);
+   expectedState.put("test3", test3content);
+
+   try (
+   AutoClosableTestHarness autoTestHarness = new 
AutoClosableTestHarness<>(
+   BasicTypeInfo.STRING_TYPE_INFO,
+   new IdentityKeySelector<>(),
+   new 
StatefulFunctionWithKeyedStateAccessedOnBroadcast(expectedState),
+   BasicTypeInfo.STRING_TYPE_INFO,
+   BasicTypeInfo.INT_TYPE_INFO)
+   ) {
+
+   TwoInputStreamOperatorTestHarness testHarness = autoTestHarness.getTestHarness();
+
+   // send elements to the keyed state
+   testHarness.processElement2(new StreamRecord<>("test1", 
12L));
+   

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159216046
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -99,7 +99,7 @@
private final ExecutionConfig executionConfig;
 
/**
-* Decoratores the input and output streams to write key-groups 
compressed.
--- End diff --

Don't want to keep the spanish name? 😉 


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159222186
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 ---
@@ -287,16 +339,60 @@ public void 
testOperatorStateMetaInfoReadSerializerFailureResilience() throws Ex

PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+   final DataInputViewStreamWrapper inWrapper = new 
DataInputViewStreamWrapper(in);
+
metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   
.getReaderForVersion(OperatorBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
--- End diff --

The test below does it but why do you write it here as well?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159215876
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
@@ -613,7 +613,6 @@ private static void checkStateMappingCompleteness(
Map partitionOffsets =

operatorStateHandle.getStateNameToPartitionOffsets();
 
-
--- End diff --

Both changes should go in a separate cleanup/refactoring commit.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159222356
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
 ---
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator;
+import 
org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator;
+import 
org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.util.Preconditions;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A BroadcastConnectedStream represents the result of connecting a stream 
with
--- End diff --

I think this is form the time when the "broadcast connect" was the other 
way round. And this also sounds a bit complicated and could be simplified, I 
think.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159223798
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 ---
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.BroadcastState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TwoInputStreamOperator} for executing {@link 
KeyedBroadcastProcessFunction}.
+ *
+ * @param  The key type of the input keyed stream.
+ * @param  The input type of the broadcast side.
+ * @param  The input type of the keyed (non-broadcast) side.
+ * @param  The key type of the elements in the {@link BroadcastState}.
+ * @param  The value type of the elements in the {@link BroadcastState}.
+ * @param  The output type of the operator.
+ */
+@Internal
+public class CoBroadcastWithKeyedOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator, 
Triggerable {
+
+   private static final long serialVersionUID = 5926499536290284870L;
+
+   private final MapStateDescriptor broadcastStateDescriptor;
+
+   private transient TimestampedCollector collector;
+
+   private transient BroadcastState broadcastState;
+
+   private transient ReadWriteContextImpl rwContext;
+
+   private transient ReadOnlyContextImpl rContext;
+
+   private transient OnTimerContextImpl onTimerContext;
+
+   public CoBroadcastWithKeyedOperator(
+   final KeyedBroadcastProcessFunction function,
+   final MapStateDescriptor 
broadcastStateDescriptor) {
+   super(function);
+   this.broadcastStateDescriptor = 
Preconditions.checkNotNull(broadcastStateDescriptor);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   InternalTimerService internalTimerService =
+   getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
+
+   TimerService timerService = new 
SimpleTimerService(internalTimerService);
+
+   collector = new TimestampedCollector<>(output);
+   broadcastState = 

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r159222137
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 ---
@@ -287,16 +339,60 @@ public void 
testOperatorStateMetaInfoReadSerializerFailureResilience() throws Ex

PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
try (ByteArrayInputStreamWithPos in = new 
ByteArrayInputStreamWithPos(serialized)) {
+   final DataInputViewStreamWrapper inWrapper = new 
DataInputViewStreamWrapper(in);
+
metaInfo = 
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   
.getReaderForVersion(OperatorBackendSerializationProxy.VERSION, 
Thread.currentThread().getContextClassLoader())
--- End diff --

This doesn't test reading the broadcast state, does it?


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-01-02 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5230

[FLINK-8345] Add iterator of keyed state on broadcast side of connected 
streams.

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink broadcast

Alternatively you can review and apply these changes as the patch at: