[GitHub] flink issue #6388: [FLINK-6222] Allow passing env variables to start scripts...

2018-07-24 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6388
  
LGTM, except the one remaining indentation problem mentioned by @zentol .


---


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6351#discussion_r204345031
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
 ---
@@ -197,6 +210,11 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
for (KeyRangeStates keyRange : snapshotKeyRanges.get()) 
{
keyRanges.add(keyRange);
}
+
+   // let event time start from the max of all event time 
progress across subtasks in the last execution
+   for (Long lastEventTime : lastEventTimes.get()) {
+   monotonousEventTime = 
Math.max(monotonousEventTime, lastEventTime);
--- End diff --

Yes, I think it should be fine because it is just for the generator. Just 
wanted to double check 👍 


---


[GitHub] flink issue #6351: [FLINK-9862] [test] Extend general puropose DataStream te...

2018-07-23 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6351
  
Had one question. Otherwise LGTM 👍 


---


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6351#discussion_r204336316
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
 ---
@@ -197,6 +210,11 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
for (KeyRangeStates keyRange : snapshotKeyRanges.get()) 
{
keyRanges.add(keyRange);
}
+
+   // let event time start from the max of all event time 
progress across subtasks in the last execution
+   for (Long lastEventTime : lastEventTimes.get()) {
+   monotonousEventTime = 
Math.max(monotonousEventTime, lastEventTime);
--- End diff --

Or, you really need to track the watermark per key-group partition.


---


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6351#discussion_r204336026
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
 ---
@@ -197,6 +210,11 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
for (KeyRangeStates keyRange : snapshotKeyRanges.get()) 
{
keyRanges.add(keyRange);
}
+
+   // let event time start from the max of all event time 
progress across subtasks in the last execution
+   for (Long lastEventTime : lastEventTimes.get()) {
+   monotonousEventTime = 
Math.max(monotonousEventTime, lastEventTime);
--- End diff --

I wonder why we compute the event time as the max and not as the min, as we 
would usually do for a combined watermark. This is probably never rescaled 
anyways, but still it looks a bit suspicious.


---


[GitHub] flink issue #6376: [FLINK-9902][tests] Improve and refactor window checkpoin...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6376
  
Thanks for the fast review @aljoscha . Addressed the comment about the 
`while`-loop and will merge now.


---


[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6376#discussion_r203982061
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Source for window checkpointing IT cases that can introduce artificial 
failures.
+ */
+public class FailingSource extends RichSourceFunction>
+   implements ListCheckpointed, CheckpointListener {
+
+   /**
+* Function to generate and emit the test events (and watermarks if 
required).
+*/
+   @FunctionalInterface
+   public interface EventEmittingGenerator extends Serializable {
+   void emitEvent(SourceContext> ctx, int 
eventSequenceNo);
+   }
+
+   private static final long INITIAL = Long.MIN_VALUE;
+   private static final long STATEFUL_CHECKPOINT_COMPLETED = 
Long.MIN_VALUE;
+
+   @Nonnull
+   private final EventEmittingGenerator eventEmittingGenerator;
+   private final int expectedEmitCalls;
+   private final int failureAfterNumElements;
+   private final boolean usingProcessingTime;
+   private final AtomicLong checkpointStatus;
+
+   private int emitCallCount;
+   private volatile boolean running;
+
+   public FailingSource(
+   @Nonnull EventEmittingGenerator eventEmittingGenerator,
+   @Nonnegative int numberOfGeneratorInvocations) {
+   this(eventEmittingGenerator, numberOfGeneratorInvocations, 
TimeCharacteristic.EventTime);
+   }
+
+   public FailingSource(
+   @Nonnull EventEmittingGenerator eventEmittingGenerator,
+   @Nonnegative int numberOfGeneratorInvocations,
+   @Nonnull TimeCharacteristic timeCharacteristic) {
+   this.eventEmittingGenerator = eventEmittingGenerator;
+   this.running = true;
+   this.emitCallCount = 0;
+   this.expectedEmitCalls = numberOfGeneratorInvocations;
+   this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
+   this.checkpointStatus = new AtomicLong(INITIAL);
+   this.usingProcessingTime = timeCharacteristic == 
TimeCharacteristic.ProcessingTime;
+   }
+
+   @Override
+   public void open(Configuration parameters) {
+   // non-parallel source
+   assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+   }
+
+   @Override
+   public void run(SourceContext> ctx) throws 
Exception {
+
+   final RuntimeContext runtimeContext = getRuntimeContext();
+   // detect if this task is "the chosen one" and should fail (via 
subtaskidx), if it did not fail before (via attempt)
+   final boolean failThisTask =
+   runtimeContext.getAttemptNumber() == 0 && 
runtimeContext.getIndexOfThisSubtask() == 0;
+
+   // we loop longer than we have elements, to permit delayed 
checkpoints
+   // to still cause a failure
+   while (running) {
+
+   // the function failed before

[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6376#discussion_r203981882
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.test.util.SuccessException;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Generalized sink for validation of window checkpointing IT cases.
+ */
+public class ValidatingSink extends RichSinkFunction
+   implements ListCheckpointed> {
+
+   /**
+* Function to check if the window counts are as expected.
+*/
+   @FunctionalInterface
+   public interface ResultChecker extends Serializable {
+   boolean checkResult(Map windowCounts);
+   }
+
+   /**
+* Function that updates the window counts from an update event.
+*
+* @param  type of the update event.
+*/
+   public interface CountUpdater extends Serializable {
+   void updateCount(T update, Map windowCounts);
+   }
+
+   @Nonnull
+   private final ResultChecker resultChecker;
+
+   @Nonnull
+   private final CountUpdater countUpdater;
+
+   @Nonnull
+   private final HashMap windowCounts;
+
+   private final boolean usingProcessingTime;
+
+   public ValidatingSink(
+   @Nonnull CountUpdater countUpdater,
+   @Nonnull ResultChecker resultChecker) {
+   this(countUpdater, resultChecker, TimeCharacteristic.EventTime);
+   }
+
+   public ValidatingSink(
+   @Nonnull CountUpdater countUpdater,
+   @Nonnull ResultChecker resultChecker,
+   @Nonnull TimeCharacteristic timeCharacteristic) {
+
+   this.resultChecker = resultChecker;
+   this.countUpdater = countUpdater;
+   this.usingProcessingTime = TimeCharacteristic.ProcessingTime == 
timeCharacteristic;
+   this.windowCounts = new HashMap<>();
+   }
+
+   @Override
+   public void open(Configuration parameters) throws Exception {
+   // this sink can only work with DOP 1
+   assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+   if (usingProcessingTime && 
resultChecker.checkResult(windowCounts)) {
--- End diff --

Was also not 100% sure if this is needed. My reason was that in theory we 
could have restored from a checkpoint of a completed job that will not emit 
events, and we will not get into `close()` unless we somehow fire a 
`SuccessException`.


---


[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6376#discussion_r203978845
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
 ---
@@ -92,19 +89,21 @@ private static Configuration getConfiguration() {
@Test
public void testTumblingProcessingTimeWindow() {
final int numElements = 3000;
-   FailingSource.reset();
 
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);
env.setStreamTimeCharacteristic(timeCharacteristic);
env.getConfig().setAutoWatermarkInterval(10);
env.enableCheckpointing(100);
-   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
--- End diff --

Yip, and it is also no longer required because we do no longer work with 
`SuccessException` if it is not required.


---


[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6376#discussion_r203978691
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Source for window checkpointing IT cases that can introduce artificial 
failures.
+ */
+public class FailingSource extends RichSourceFunction>
+   implements ListCheckpointed, CheckpointListener {
+
+   /**
+* Function to generate and emit the test events (and watermarks if 
required).
+*/
+   @FunctionalInterface
+   public interface EventEmittingGenerator extends Serializable {
+   void emitEvent(SourceContext> ctx, int 
eventSequenceNo);
+   }
+
+   private static final long INITIAL = Long.MIN_VALUE;
+   private static final long STATEFUL_CHECKPOINT_COMPLETED = 
Long.MIN_VALUE;
+
+   @Nonnull
+   private final EventEmittingGenerator eventEmittingGenerator;
+   private final int expectedEmitCalls;
+   private final int failureAfterNumElements;
+   private final boolean usingProcessingTime;
+   private final AtomicLong checkpointStatus;
+
+   private int emitCallCount;
+   private volatile boolean running;
+
+   public FailingSource(
+   @Nonnull EventEmittingGenerator eventEmittingGenerator,
+   @Nonnegative int numberOfGeneratorInvocations) {
+   this(eventEmittingGenerator, numberOfGeneratorInvocations, 
TimeCharacteristic.EventTime);
+   }
+
+   public FailingSource(
+   @Nonnull EventEmittingGenerator eventEmittingGenerator,
+   @Nonnegative int numberOfGeneratorInvocations,
+   @Nonnull TimeCharacteristic timeCharacteristic) {
+   this.eventEmittingGenerator = eventEmittingGenerator;
+   this.running = true;
+   this.emitCallCount = 0;
+   this.expectedEmitCalls = numberOfGeneratorInvocations;
+   this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
+   this.checkpointStatus = new AtomicLong(INITIAL);
+   this.usingProcessingTime = timeCharacteristic == 
TimeCharacteristic.ProcessingTime;
+   }
+
+   @Override
+   public void open(Configuration parameters) {
+   // non-parallel source
+   assertEquals(1, 
getRuntimeContext().getNumberOfParallelSubtasks());
+   }
+
+   @Override
+   public void run(SourceContext> ctx) throws 
Exception {
+
+   final RuntimeContext runtimeContext = getRuntimeContext();
+   // detect if this task is "the chosen one" and should fail (via 
subtaskidx), if it did not fail before (via attempt)
+   final boolean failThisTask =
+   runtimeContext.getAttemptNumber() == 0 && 
runtimeContext.getIndexOfThisSubtask() == 0;
+
+   // we loop longer than we have elements, to permit delayed 
checkpoints
+   // to still cause a failure
+   while (running) {
+
+   // the function failed before

[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6376#discussion_r203978368
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 ---
@@ -88,20 +80,17 @@ public void testTumblingTimeWindow() {
final int numElementsPerKey = 3000;
final int windowSize = 100;
final int numKeys = 1;
-   FailingSource.reset();
 
try {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(PARALLELISM);

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(100);
-   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
env.getConfig().disableSysoutLogging();
 
env
-   .addSource(new FailingSource(numKeys,
-   numElementsPerKey,
-   numElementsPerKey / 3))
+   .addSource(new FailingSource(new 
EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, 
windowSize), numElementsPerKey))
--- End diff --

I would start duplicating them if there is divergence and they start to 
fail :-)


---


[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6376#discussion_r203978136
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java
 ---
@@ -133,9 +122,12 @@ public void apply(
out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
}
})
-   .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSize)).setParallelism(1);
+   .addSink(new ValidatingSink<>(
+   new 
EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey),
+   new 
EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize)))
+   .setParallelism(1);
 
-   tryExecute(env, "Tumbling Window Test");
+   env.execute("Tumbling Window Test");
--- End diff --

Same as above.


---


[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6376#discussion_r203978068
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java
 ---
@@ -299,12 +288,17 @@ public void apply(
sum += 
value.f1.value;
key = value.f0;
}
-   out.collect(new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)));
+
+   final Tuple4 result =
+   new 
Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum));
+   out.collect(result);
}
})
-   .addSink(new ValidatingSink(numKeys, 
numElementsPerKey / windowSize)).setParallelism(1);
+   .addSink(new ValidatingSink<>(
+   new 
SinkValidatorUpdateFun(numElementsPerKey),
+   new SinkValidatorCheckFun(numKeys, 
numElementsPerKey, windowSize))).setParallelism(1);
 
-   tryExecute(env, "Tumbling Window Test");
+   env.execute("Tumbling Window Test");
--- End diff --

Because it does not require to use `SuccessExceptions` because in event 
time the end of the source function is deterministic.


---


[GitHub] flink issue #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-20 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6361
  
Very good work, LGTM 👍 Merging.


---


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6351#discussion_r203382388
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java
 ---
@@ -56,6 +56,7 @@ public void flatMap(Event event, Collector out) 
throws Exception {
if (validator.check(currentValue, nextValue)) {
sequenceValue.update(nextValue);
} else {
+   sequenceValue.update(nextValue);
--- End diff --

```
sequenceValue.update(nextValue);
if (!validator.check(currentValue, nextValue)) {
out.collect("Alert: " + currentValue + " -> " + 
nextValue + " (" + event.getKey() + ")");
}
```


---


[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6351#discussion_r203380952
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 ---
@@ -184,12 +189,16 @@
 
private static final ConfigOption 
SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions
.key("sequence_generator_source.event_time.max_out_of_order")
-   .defaultValue(500L);
+   .defaultValue(0L);
 
private static final ConfigOption 
SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions
.key("sequence_generator_source.event_time.clock_progress")
.defaultValue(100L);
 
+   private static final ConfigOption 
TUMBLING_WINDOW_OPERATOR_NUM_EVENTS = ConfigOptions
+   .key("sliding_window_operator.num_events")
--- End diff --

`tumbling` instead of `sliding`?


---


[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6361#discussion_r203377098
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -240,6 +240,15 @@ function start_cluster {
   done
 }
 
+function start_taskmanagers {
--- End diff --

I think you could just reuse function `tm_watchdog` for this purpose.


---


[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6361#discussion_r203372004
  
--- Diff: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import javax.annotation.Nonnull;
+
+import java.io.Serializable;
+
+/** Contains relevant for state TTL update data. */
+public class TtlUpdateContext implements Serializable {
+   private final int key;
+
+   @Nonnull
+   private final String verifierId;
+
+   private final GV valueBeforeUpdate;
+   private final UV update;
+   private final GV updatedValue;
+
+   private final long timestamp;
+
+   public TtlUpdateContext(int key, @Nonnull String verifierId, GV 
valueBeforeUpdate, UV update, GV updatedValue) {
+   this(key, verifierId, valueBeforeUpdate, update, updatedValue, 
System.currentTimeMillis());
--- End diff --

This is recording the timestamp with some imprecision from the actual time 
that the TTL state saw. On the test server, this time difference can be 
significant and lead to flaky tests, even with your hardcoded imprecision. 
Instead, we could record the timestamp `t1` before accessing the TTL, and 
timestamp `t2` after accessing the TTL. All state with ttl < `t1` must clearly 
be expired, all with ttl > `t2` must clearly be there, and cases that fall 
between `t1` and `t2` could be ignored because we cannot clearly decide if they 
saw a timestamp before or after their expiration when accessed.


---


[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6361#discussion_r203363708
  
--- Diff: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.tests.verify.TtlUpdateContext;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_KEYSPACE;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_TIME;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+/**
+ * A test job for State TTL feature.
+ */
+public class DataStreamStateTTLTestProgram {
+   private static final ConfigOption STATE_TTL_VERIFIER_TTL_MILLI = 
ConfigOptions
--- End diff --

(also for the ones we take from the allround job.)


---


[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6361#discussion_r203362731
  
--- Diff: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java
 ---
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.tests.verify.TtlUpdateContext;
+
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_KEYSPACE;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_TIME;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
+
+/**
+ * A test job for State TTL feature.
+ */
+public class DataStreamStateTTLTestProgram {
+   private static final ConfigOption STATE_TTL_VERIFIER_TTL_MILLI = 
ConfigOptions
--- End diff --

Please add documentation for the config parameters to the class-level 
javadoc


---


[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6361#discussion_r203361200
  
--- Diff: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+abstract class AbstractTtlStateVerifier, 
S extends State, SV, UV, GV>
+   implements TtlStateVerifier {
+   static final Random RANDOM = new Random();
+
+   @Nonnull
+   final D stateDesc;
+
+   AbstractTtlStateVerifier(@Nonnull D stateDesc) {
+   this.stateDesc = stateDesc;
+   }
+
+   @Nonnull
+   static String randomString() {
+   StringBuilder sb = new StringBuilder();
+   IntStream.range(0, RANDOM.nextInt(14) + 2).forEach(i -> 
sb.append(randomChar()));
+   return sb.toString();
+   }
+
+   private static char randomChar() {
+   char d = (char) ('0' + RANDOM.nextInt(9));
+   char l = (char) ('a' + RANDOM.nextInt(25));
+   return RANDOM.nextBoolean() ? d : l;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   @Nonnull
+   public State createState(@Nonnull FunctionInitializationContext 
context, @Nonnull StateTtlConfiguration ttlConfig) {
+   stateDesc.enableTimeToLive(ttlConfig);
+   return createState(context);
+   }
+
+   abstract State createState(FunctionInitializationContext context);
+
+   @SuppressWarnings("unchecked")
+   @Override
+   @Nonnull
+   public TypeSerializer getUpdateSerializer() {
+   return (TypeSerializer) stateDesc.getSerializer();
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public GV get(@Nonnull State state) throws Exception {
+   return getInternal((S) state);
+   }
+
+   abstract GV getInternal(@Nonnull S state) throws Exception;
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public void update(@Nonnull State state, Object update) throws 
Exception {
+   updateInternal((S) state, (UV) update);
+   }
+
+   abstract void updateInternal(@Nonnull S state, UV update) throws 
Exception;
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public boolean verify(@Nonnull TtlVerificationContext 
verificationContextRaw, @Nonnull Time precision) {
--- End diff --

I think you can get rid of handling wildcards `` and casting by 
introducing the following method in `TtlVerifyFunction`:

```
private  void verify(TtlStateVerifier verifier, 
TtlVerificationContext verificationContext) {
verifier.verify(verificationContext, precision);
}
```

and call it

```
verify(TtlStateVerifier.VERIFIERS_BY_NAME.get(value.getVerifierId()), new 
TtlVerificationContext<>(prevValues, value));
```


---


[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6361#discussion_r203355448
  
--- Diff: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+abstract class AbstractTtlStateVerifier, 
S extends State, SV, UV, GV>
+   implements TtlStateVerifier {
+   static final Random RANDOM = new Random();
+
+   @Nonnull
+   final D stateDesc;
+
+   AbstractTtlStateVerifier(@Nonnull D stateDesc) {
+   this.stateDesc = stateDesc;
+   }
+
+   @Nonnull
+   static String randomString() {
+   StringBuilder sb = new StringBuilder();
+   IntStream.range(0, RANDOM.nextInt(14) + 2).forEach(i -> 
sb.append(randomChar()));
+   return sb.toString();
+   }
+
+   private static char randomChar() {
+   char d = (char) ('0' + RANDOM.nextInt(9));
+   char l = (char) ('a' + RANDOM.nextInt(25));
+   return RANDOM.nextBoolean() ? d : l;
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   @Nonnull
+   public State createState(@Nonnull FunctionInitializationContext 
context, @Nonnull StateTtlConfiguration ttlConfig) {
+   stateDesc.enableTimeToLive(ttlConfig);
+   return createState(context);
+   }
+
+   abstract State createState(FunctionInitializationContext context);
+
+   @SuppressWarnings("unchecked")
+   @Override
+   @Nonnull
+   public TypeSerializer getUpdateSerializer() {
+   return (TypeSerializer) stateDesc.getSerializer();
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public GV get(@Nonnull State state) throws Exception {
+   return getInternal((S) state);
+   }
+
+   abstract GV getInternal(@Nonnull S state) throws Exception;
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public void update(@Nonnull State state, Object update) throws 
Exception {
+   updateInternal((S) state, (UV) update);
+   }
+
+   abstract void updateInternal(@Nonnull S state, UV update) throws 
Exception;
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public boolean verify(@Nonnull TtlVerificationContext 
verificationContextRaw, @Nonnull Time precision) {
+   TtlVerificationContext verificationContext = 
(TtlVerificationContext) verificationContextRaw;
+   if (!isWithinPrecision(verificationContext, precision)) {
+   return true;
+   }
+   List> updates = new 
ArrayList<>(verificationContext.getPrevUpdates());
+   long currentTimestamp = 
verificationContext.getUpdateContext().getTimestamp();
+   GV prevValue = expected(updates, currentTimestamp);
+   GV valueBeforeUpdate = 
verificationContext.getUpdateContext().getValueBeforeUpdate();
+   TtlValue update = 
verificationContext.getUpdateContext().getUpdateWithTs();
+   GV updatedValue = 
verificationContext.getUpdateContext().getUpdatedValue();
+   updates.add(update);
+   GV expectedValue = expected(updates, currentTimestamp);
+   

[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test

2018-07-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6361#discussion_r203353112
  
--- Diff: 
flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests.verify;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfiguration;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.IntStream;
+
+abstract class AbstractTtlStateVerifier, 
S extends State, SV, UV, GV>
+   implements TtlStateVerifier {
+   static final Random RANDOM = new Random();
+
+   @Nonnull
+   final D stateDesc;
+
+   AbstractTtlStateVerifier(@Nonnull D stateDesc) {
+   this.stateDesc = stateDesc;
+   }
+
+   @Nonnull
+   static String randomString() {
--- End diff --

You could use the existing `StringUtil.getRandomString(...)` instead of 
your own random string generator.


---


[GitHub] flink issue #6302: [FLINK-9061][checkpointing] add entropy to s3 path for be...

2018-07-16 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6302
  
@indrc I think there is no contradiction between Stephan's comment and my 
suggestion about 1. I stand by my point that this is a very common task and 
there is no very special requirement in the random string that would really 
require a new method. My initial suggestions `RandomStringUtils` and Flink's 
`UUID` are both already available without adding another dependency and a 
full-test search for `randomstring` in the project also gave some hits like 
e.g. `StringUtil`, that could also be used or extended in a more general way 
than putting our own algorithm in this particular place.


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202556109
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java
 ---
@@ -80,13 +82,57 @@ public E peek() {
 
@Override
public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) {
+   bulkPollRelaxedOrder(canConsume, consumer);
+   } else {
+   bulkPollStrictOrder(canConsume, consumer);
+   }
+   }
+
+   private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   } else {
+   while (!orderedCache.isEmpty() && 
canConsume.test(orderedCache.peekFirst())) {
+   final E next = orderedCache.removeFirst();
+   orderedStore.remove(next);
+   consumer.accept(next);
+   }
+
+   if (orderedCache.isEmpty()) {
+   bulkPollStore(canConsume, consumer);
+   }
+   }
+   }
+
+   private void bulkPollStrictOrder(@Nonnull Predicate canConsume, 
@Nonnull Consumer consumer) {
E element;
while ((element = peek()) != null && canConsume.test(element)) {
poll();
consumer.accept(element);
}
}
 
+   private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull 
Consumer consumer) {
+   try (CloseableIterator iterator = 
orderedStore.orderedIterator()) {
+   while (iterator.hasNext()) {
+   final E next = iterator.next();
+   if (canConsume.test(next)) {
+   orderedStore.remove(next);
+   consumer.accept(next);
+   } else {
+   orderedCache.add(next);
+   while (iterator.hasNext() && 
!orderedCache.isFull()) {
+   
orderedCache.add(iterator.next());
+   }
+   break;
+   }
+   }
+   } catch (Exception e) {
+   throw new FlinkRuntimeException("Exception while bulk 
polling store.", e);
--- End diff --

Why would you prefer it? I think there is no better way that caller can 
handle problems in this call than failing the job (rocksdb problems)?


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6333#discussion_r202556091
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -446,8 +485,10 @@ public String toString() {
@Override
public int numStateEntries() {
int sum = 0;
-   for (StateTable stateTable : stateTables.values()) {
-   sum += stateTable.size();
+   for (StateSnapshotRestore stateTable : 
registeredStates.values()) {
+   if (stateTable instanceof StateTable) {
+   sum += ((StateTable) 
stateTable).size();
+   }
--- End diff --

This method is only used for some tests, and to be on the safe side it 
probably only expected to count the keyed state and not some timers.


---


[GitHub] flink issue #6333: [FLINK-9489] Checkpoint timers as part of managed keyed s...

2018-07-14 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6333
  
CC @tillrohrmann 


---


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-14 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/6333

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

## What is the purpose of the change

This PR integrates priority queue state (timers) with the snapshotting of 
Flink's state backend ans also already includes backwards compatibility 
(FLINK-9490). Core idea is to have a common abstraction for how state is 
registered in the state backend and how snapshots operator on such state 
(`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new 
state integrates more or less seemless with existing snapshot logic. The 
notable exception is a current lack of integration of RocksDB state backend 
with heap-based priority queue state. This can currently still use the old 
snapshot code without causing any regression using a temporary path (see 
`AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink 
supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks 
queue (full and incremental), rocks kv / heap queue (only full)  and still uses 
synchronous snapshots for rocks kv / heap queue (only incremental).

This work was created in a bit of a rush to make it into the 1.6 release 
and still has some known rough edges that we could fix up in the test phase. 
Here is a list of some things that already come to my mind:

- Integrate heap-based timers with incremental RocksDB snapshots, then kick 
out some code.
- Check proper integration with serializer upgrade story (!!)
- After that, we can also remove the key-partitioning in the set structure 
from `HeapPriorityQueueSet`.
- Improve integration of the batch wrapper.
- Improve general state registration logic in the backends, there is 
potential to remove duplicated code, and generally still improve the 
integration of the queue state a bit.
- Improve performance of RocksDB based timers, e.g. byte[] based cache, 
seek sharp to the next potential timer instead of seeking to the key-group 
start, bulkPoll.
- Improve some class/interface/method names
- Add tests, e.g. bulkPoll etc.

## Verifying this change

This change is already covered by existing tests, but I would add some more 
eventually. You can activate RocksDB based timers by using the RocksDB backend 
and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs only for now)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6333.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6333


commit 1bb8f70700deacc49a4d4ac7900425c10272959d
Author: Stefan Richter 
Date:   2018-06-13T09:56:16Z

[FLINK-9489] Checkpoint timers as part of managed keyed state instead of 
raw keyed state

commit fc20df8268decab6d9890d617787a4084284b2f0
Author: Stefan Richter 
Date:   2018-07-13T23:19:30Z

Optimization for relaxed bulk polls

commit 4db1bea90fd6881555172fe3d22ee928e97894a7
Author: Stefan Richter 
Date:   2018-07-14T06:34:16Z

Renaming of some classes/interfaces




---


[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

2018-07-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6325#discussion_r202295131
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1298,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2> stateInfo) throws Exception {
+
+   @SuppressWarnings("unchecked")
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfoSnapshot =
+   (RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(
+   stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer);
+
+   // check compatibility results to determine if state migration 
is required
+   TypeSerializerSchemaCompatibility namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+   namespaceSerializer);
+
+   TypeSerializerSchemaCompatibility stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+   stateSerializer);
+
+   if (namespaceCompatibility.isIncompatible()) {
+   throw new UnsupportedOperationException(
+   "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
+   }
+
+   if (stateCompatibility.isIncompatible()) {
+   if 
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
+   throw new UnsupportedOperationException(
+   "Changing the TypeSerializers of a 
MapState in an incompatible way is currently not supported.");
+   }
+
+   LOG.info(
+   "Performing state migration for state {} 
because the state serializer changed in an incompatible way.",
+   stateDesc);
+
+   // we need to get an actual state instance because 
migration is different
+   // for different state types. For example, ListState 
needs to deal with
+   // individual elements
+   StateFactory stateFactory = 
STATE_FACTORIES.get(stateDesc.getClass());
+   if (stateFactory == null) {
+   String message = String.format("State %s is not 
supported by %s",
+   stateDesc.getClass(), this.getClass());
+   throw new FlinkRuntimeException(message);
+   }
+
+   State state = stateFactory.createState(
+   stateDesc,
+   Tuple2.of(stateInfo.f0, newMetaInfo),
+   RocksDBKeyedStateBackend.this);
+
+   if (!(state instanceof AbstractRocksDBState)) {
+   throw new FlinkRuntimeException(
+   "State should be an 
AbstractRocksDBState but is " + state);
+   }
+
+   AbstractRocksDBState rocksDBState = 
(AbstractRocksDBState) state;
+
+   Snapshot rocksDBSnapshot = null;
+   RocksIteratorWrapper iterator = null;
+
+   try (ReadOptions readOptions = new ReadOptions();) {
+   // TODO: can I do this with try-with-resource 
or do I always have to call
+

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

2018-07-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6325#discussion_r202285840
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java
 ---
@@ -99,7 +99,8 @@
// TODO this method actually should not have a default 
implementation;
// TODO this placeholder should be removed as soon as all 
subclasses have a proper implementation in place, and
// TODO the method is properly integrated in state backends' 
restore procedures
-   throw new UnsupportedOperationException();
+// throw new UnsupportedOperationException();
--- End diff --

Remove this line.


---


[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

2018-07-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6325#discussion_r202293686
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2> stateInfo) throws Exception {
+
+   @SuppressWarnings("unchecked")
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfoSnapshot =
+   (RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(
+   stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer);
+
+   // check compatibility results to determine if state migration 
is required
+   TypeSerializerSchemaCompatibility namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+   namespaceSerializer);
+
+   TypeSerializerSchemaCompatibility stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+   stateSerializer);
+
+   if (namespaceCompatibility.isIncompatible()) {
+   throw new UnsupportedOperationException(
+   "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
+   }
+
+   if (stateCompatibility.isIncompatible()) {
+   if 
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
+   throw new UnsupportedOperationException(
+   "Changing the TypeSerializers of a 
MapState in an incompatible way is currently not supported.");
+   }
+
+   LOG.info(
+   "Performing state migration for state {} 
because the state serializer changed in an incompatible way.",
+   stateDesc);
+
+   // we need to get an actual state instance because 
migration is different
+   // for different state types. For example, ListState 
needs to deal with
+   // individual elements
+   StateFactory stateFactory = 
STATE_FACTORIES.get(stateDesc.getClass());
+   if (stateFactory == null) {
+   String message = String.format("State %s is not 
supported by %s",
+   stateDesc.getClass(), this.getClass());
+   throw new FlinkRuntimeException(message);
+   }
+
+   State state = stateFactory.createState(
+   stateDesc,
+   Tuple2.of(stateInfo.f0, newMetaInfo),
+   RocksDBKeyedStateBackend.this);
+
+   if (!(state instanceof AbstractRocksDBState)) {
+   throw new FlinkRuntimeException(
+   "State should be an 
AbstractRocksDBState but is " + state);
+   }
+
+   AbstractRocksDBState rocksDBState = 
(AbstractRocksDBState) state;
--- End diff --

Avoid using raw types for the reference.


---


[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

2018-07-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6325#discussion_r202293217
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2> stateInfo) throws Exception {
+
+   @SuppressWarnings("unchecked")
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfoSnapshot =
+   (RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(
+   stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer);
+
+   // check compatibility results to determine if state migration 
is required
+   TypeSerializerSchemaCompatibility namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+   namespaceSerializer);
+
+   TypeSerializerSchemaCompatibility stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+   stateSerializer);
+
+   if (namespaceCompatibility.isIncompatible()) {
+   throw new UnsupportedOperationException(
+   "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
+   }
+
+   if (stateCompatibility.isIncompatible()) {
+   if 
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
+   throw new UnsupportedOperationException(
+   "Changing the TypeSerializers of a 
MapState in an incompatible way is currently not supported.");
+   }
+
+   LOG.info(
+   "Performing state migration for state {} 
because the state serializer changed in an incompatible way.",
+   stateDesc);
+
+   // we need to get an actual state instance because 
migration is different
+   // for different state types. For example, ListState 
needs to deal with
+   // individual elements
+   StateFactory stateFactory = 
STATE_FACTORIES.get(stateDesc.getClass());
+   if (stateFactory == null) {
+   String message = String.format("State %s is not 
supported by %s",
+   stateDesc.getClass(), this.getClass());
+   throw new FlinkRuntimeException(message);
+   }
+
+   State state = stateFactory.createState(
+   stateDesc,
+   Tuple2.of(stateInfo.f0, newMetaInfo),
+   RocksDBKeyedStateBackend.this);
+
+   if (!(state instanceof AbstractRocksDBState)) {
+   throw new FlinkRuntimeException(
+   "State should be an 
AbstractRocksDBState but is " + state);
+   }
+
+   AbstractRocksDBState rocksDBState = 
(AbstractRocksDBState) state;
+
+   Snapshot rocksDBSnapshot = null;
+   RocksIteratorWrapper iterator = null;
+
+   try (ReadOptions readOptions = new ReadOptions();) {
--- End diff --

I would suggest to try this:
```
Snapshot rocksDBSnapshot = db.getSnapshot();
  

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

2018-07-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6325#discussion_r202293477
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2> stateInfo) throws Exception {
+
+   @SuppressWarnings("unchecked")
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfoSnapshot =
+   (RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(
+   stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer);
+
+   // check compatibility results to determine if state migration 
is required
+   TypeSerializerSchemaCompatibility namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+   namespaceSerializer);
+
+   TypeSerializerSchemaCompatibility stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+   stateSerializer);
+
+   if (namespaceCompatibility.isIncompatible()) {
+   throw new UnsupportedOperationException(
+   "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
+   }
+
+   if (stateCompatibility.isIncompatible()) {
+   if 
(stateDesc.getType().equals(StateDescriptor.Type.MAP)) {
+   throw new UnsupportedOperationException(
+   "Changing the TypeSerializers of a 
MapState in an incompatible way is currently not supported.");
+   }
+
+   LOG.info(
+   "Performing state migration for state {} 
because the state serializer changed in an incompatible way.",
+   stateDesc);
+
+   // we need to get an actual state instance because 
migration is different
+   // for different state types. For example, ListState 
needs to deal with
+   // individual elements
+   StateFactory stateFactory = 
STATE_FACTORIES.get(stateDesc.getClass());
+   if (stateFactory == null) {
+   String message = String.format("State %s is not 
supported by %s",
+   stateDesc.getClass(), this.getClass());
+   throw new FlinkRuntimeException(message);
+   }
+
+   State state = stateFactory.createState(
+   stateDesc,
+   Tuple2.of(stateInfo.f0, newMetaInfo),
+   RocksDBKeyedStateBackend.this);
+
+   if (!(state instanceof AbstractRocksDBState)) {
+   throw new FlinkRuntimeException(
+   "State should be an 
AbstractRocksDBState but is " + state);
+   }
+
+   AbstractRocksDBState rocksDBState = 
(AbstractRocksDBState) state;
+
+   Snapshot rocksDBSnapshot = null;
+   RocksIteratorWrapper iterator = null;
+
+   try (ReadOptions readOptions = new ReadOptions();) {
+   // TODO: can I do this with try-with-resource 
or do I always have to call
+

[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...

2018-07-13 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6325#discussion_r202289229
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1312,6 +1283,128 @@ private void copyStateDataHandleData(
return Tuple2.of(stateInfo.f0, newMetaInfo);
}
 
+   private  RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary(
+   StateDescriptor stateDesc,
+   TypeSerializer namespaceSerializer,
+   Tuple2> stateInfo) throws Exception {
+
+   @SuppressWarnings("unchecked")
+   RegisteredKeyedBackendStateMetaInfo.Snapshot 
restoredMetaInfoSnapshot =
+   (RegisteredKeyedBackendStateMetaInfo.Snapshot) 
restoredKvStateMetaInfos.get(
+   stateDesc.getName());
+
+   Preconditions.checkState(
+   restoredMetaInfoSnapshot != null,
+   "Requested to check compatibility of a restored 
RegisteredKeyedBackendStateMetaInfo," +
+   " but its corresponding restored snapshot 
cannot be found.");
+
+   StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, 
stateDesc);
+
+   TypeSerializer stateSerializer = stateDesc.getSerializer();
+
+   RegisteredKeyedBackendStateMetaInfo newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
+   stateDesc.getType(),
+   stateDesc.getName(),
+   namespaceSerializer,
+   stateSerializer);
+
+   // check compatibility results to determine if state migration 
is required
+   TypeSerializerSchemaCompatibility namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(),
+   namespaceSerializer);
+
+   TypeSerializerSchemaCompatibility stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
+   
restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(),
+   stateSerializer);
+
+   if (namespaceCompatibility.isIncompatible()) {
+   throw new UnsupportedOperationException(
+   "Changing the namespace TypeSerializer in an 
incompatible way is currently not supported.");
+   }
+
+   if (stateCompatibility.isIncompatible()) {
--- End diff --

The handling of this branch could maybe go to it's own private method to 
break down this big monolithic method a bit.


---


[GitHub] flink issue #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6313
  
LGTM, nice work! 👍 Besides one comment about closing the backends after 
tests, the PR is ready. This is no big thing so I will just fix it myself 
before merging now.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202130806
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+   private final StateBackend stateBackend;
+   private final CheckpointStorageLocation checkpointStorageLocation;
+   private final TtlTimeProvider timeProvider;
+
+   private AbstractKeyedStateBackend keyedStateBackend;
+
+   protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+   this.timeProvider = Preconditions.checkNotNull(timeProvider);
+   this.stateBackend = 
Preconditions.checkNotNull(createStateBackend());
+   this.checkpointStorageLocation = 
createCheckpointStorageLocation();
+   }
+
+   protected abstract StateBackend createStateBackend();
+
+   private CheckpointStorageLocation createCheckpointStorageLocation() {
+   try {
+   return stateBackend
+   .createCheckpointStorage(new JobID())
+   .initializeLocationForCheckpoint(2L);
+   } catch (IOException e) {
+   throw new RuntimeException("unexpected");
+   }
+   }
+
+   void createAndRestoreKeyedStateBackend() {
+   Environment env = new DummyEnvironment();
+   try {
+   if (keyedStateBackend != null) {
+   keyedStateBackend.dispose();
--- End diff --

There is a problem that the backend is only disposed here and not after 
each test, this leads to some native errors when I run the test. I suggest to 
give this context a `dispose` method and call it in a `@After` method.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202103083
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

No, timers cannot use state descriptor, they cannot extend `State`


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202058484
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -240,7 +243,7 @@ private boolean hasRegisteredState() {
}
 
@Override
-   public  IS createState(
+   public  IS createInternalState(
--- End diff --

Why are we adding `Internal` here? I would suggest to call the method 
`create(Internal?)KeyValueState`, because there will also be other state in the 
future (timers). 


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202057089
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -93,7 +94,7 @@
private static final long serialVersionUID = -8191916350224044011L;
 
/** Maximum size of state that is stored with the metadata, rather than 
in files (1 MiByte). */
-   public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+   private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
--- End diff --

Ok, it is `PublicEvolving` so you can keep the change.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202056626
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -93,7 +94,7 @@
private static final long serialVersionUID = -8191916350224044011L;
 
/** Maximum size of state that is stored with the metadata, rather than 
in files (1 MiByte). */
-   public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+   private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
--- End diff --

This should not be changed, because the class is user-facing API and 
someone might have used it.


---


[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6308#discussion_r202047581
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java
 ---
@@ -98,8 +103,7 @@ public int getVersion() {
 
@Override
public int[] getCompatibleVersions() {
-   // we are compatible with version 3 (Flink 1.3.x) and version 1 
& 2 (Flink 1.2.x)
-   return new int[] {VERSION, 3, 2, 1};
+   return new int[]{VERSION, 4, 3, 2, 1};
--- End diff --

Both styles are ok and used in Flink, so I will stick to this.


---


[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6308#discussion_r202047403
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java
 ---
@@ -126,11 +131,11 @@ public void 
testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati
Assert.assertTrue(serializationProxy.getKeySerializer() 
instanceof UnloadableDummyTypeSerializer);
Assert.assertEquals(keySerializer.snapshotConfiguration(), 
serializationProxy.getKeySerializerConfigSnapshot());
 
-   for (RegisteredKeyedBackendStateMetaInfo.Snapshot meta : 
serializationProxy.getStateMetaInfoSnapshots()) {
-   Assert.assertTrue(meta.getNamespaceSerializer() 
instanceof UnloadableDummyTypeSerializer);
-   Assert.assertTrue(meta.getStateSerializer() instanceof 
UnloadableDummyTypeSerializer);
-   
Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), 
meta.getNamespaceSerializerConfigSnapshot());
-   
Assert.assertEquals(stateSerializer.snapshotConfiguration(), 
meta.getStateSerializerConfigSnapshot());
+   for (StateMetaInfoSnapshot meta : 
serializationProxy.getStateMetaInfoSnapshots()) {
--- End diff --

👍 Good point


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202042548
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

I agree, seems like there is no good solution for this.


---


[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6308
  
@sihuazhou @azagrebin thanks guys for the fast reviews! Will address the 
comments and merge.


---


[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6308#discussion_r202041046
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java
 ---
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.metainfo;
+
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Static factory that gives out the write and readers for different 
versions of {@link StateMetaInfoSnapshot}.
+ */
+public class StateMetaInfoSnapshotReadersWriters {
+
+   /**
+* Current version for the serialization format of {@link 
StateMetaInfoSnapshotReadersWriters}.
+* - v5: Flink 1.6.x
+*/
+   public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5;
+
+   /**
+* Enum for backeards compatibility. This gives a hint about the 
expected state type for which a
+* {@link StateMetaInfoSnapshot} should be deserialized.
+*
+* TODO this can go away after we eventually drop backwards 
compatibility with all versions < 5.
+*/
+   public enum StateTypeHint {
+   KEYED_STATE,
+   OPERATOR_STATE
+   }
+
+   /**
+* Returns the writer for {@link StateMetaInfoSnapshot}.
+*/
+   @Nonnull
+   public static StateMetaInfoWriter getWriter() {
+   return CurrentWriterImpl.INSTANCE;
+   }
+
+   /**
+* Returns a reader for {@link StateMetaInfoSnapshot} with the 
requested state type and version number.
+*
+* @param readVersion the format version to read.
+* @param stateTypeHint a hint about the expected type to read.
+* @return the requested reader.
+*/
+   @Nonnull
+   public static StateMetaInfoReader getReader(int readVersion, @Nonnull 
StateTypeHint stateTypeHint) {
+
+   if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+   // latest version shortcut
+   return CurrentReaderImpl.INSTANCE;
+   }
+
+   if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) {
+   throw new IllegalArgumentException("Unsupported read 
version for state meta info: " + readVersion);
+   }
+
+   switch (stateTypeHint) {
--- End diff --

👍 


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202036711
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception {
 
@Override
public Iterable> entries() throws Exception {
-   return entriesStream()::iterator;
+   return entries(e -> e);
}
 
-   private Stream> entriesStream() throws Exception {
+   private  Iterable entries(
+   Function, R> resultMapper) throws Exception {
Iterable>> withTs = 
original.entries();
-   withTs = withTs == null ? Collections.emptyList() : withTs;
-   return StreamSupport
-   .stream(withTs.spliterator(), false)
-   .filter(this::unexpiredAndUpdateOrCleanup)
-   .map(TtlMapState::unwrapWithoutTs);
-   }
-
-   private boolean unexpiredAndUpdateOrCleanup(Map.Entry> 
e) {
-   UV unexpiredValue;
-   try {
-   unexpiredValue = getWithTtlCheckAndUpdate(
-   e::getValue,
-   v -> original.put(e.getKey(), v),
-   () -> original.remove(e.getKey()));
-   } catch (Exception ex) {
-   throw new FlinkRuntimeException(ex);
-   }
-   return unexpiredValue != null;
-   }
-
-   private static  Map.Entry unwrapWithoutTs(Map.Entry> e) {
-   return new AbstractMap.SimpleEntry<>(e.getKey(), 
e.getValue().getUserValue());
+   return () -> new EntriesIterator<>(withTs == null ? 
Collections.emptyList() : withTs, resultMapper);
}
 
@Override
public Iterable keys() throws Exception {
-   return entriesStream().map(Map.Entry::getKey)::iterator;
+   return entries(Map.Entry::getKey);
}
 
@Override
public Iterable values() throws Exception {
-   return entriesStream().map(Map.Entry::getValue)::iterator;
+   return entries(Map.Entry::getValue);
}
 
@Override
public Iterator> iterator() throws Exception {
-   return entriesStream().iterator();
+   return entries().iterator();
}
 
@Override
public void clear() {
original.clear();
}
+
+   private class EntriesIterator implements Iterator {
+   private final Iterator>> 
originalIterator;
+   private final Function, R> resultMapper;
+   private Map.Entry nextUnexpired = null;
+   private boolean rightAfterNextIsCalled = false;
+
+   private EntriesIterator(
+   @Nonnull Iterable>> withTs,
+   @Nonnull Function, R> resultMapper) {
+   this.originalIterator = withTs.iterator();
+   this.resultMapper = resultMapper;
+   }
+
+   @Override
+   public boolean hasNext() {
+   rightAfterNextIsCalled = false;
+   while (nextUnexpired == null && 
originalIterator.hasNext()) {
+   nextUnexpired = 
getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+   }
+   return nextUnexpired != null;
+   }
+
+   @Override
+   public R next() {
+   if (hasNext()) {
+   rightAfterNextIsCalled = true;
+   R result = resultMapper.apply(nextUnexpired);
+   nextUnexpired = null;
+   return result;
+   }
+   throw new NoSuchElementException();
+   }
+
+   @Override
+   public void remove() {
+   if (rightAfterNextIsCalled) {
--- End diff --

I think this a problematic for example in the sequence `hasNext()`, 
``next()`, `hasNext()`, `remove()` which is a valid interaction.


---


[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors

2018-07-12 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6313#discussion_r202032512
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -92,6 +93,10 @@
@Nullable
private String queryableStateName;
 
+   /** Name for queries against state created from this StateDescriptor. */
+   @Nullable
+   private StateTtlConfiguration ttlConfig;
--- End diff --

I would suggest to prefer @Nonnull and a `StateTtlConfiguration` that 
represents `disabled ttl`. So that the getter will also not return `null` and 
code can drop `null` checks.


---


[GitHub] flink issue #6277: [FLINK-9511] Implement TTL config

2018-07-11 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6277
  
LGTM 👍 merging.


---


[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...

2018-07-11 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6308
  
CC @azagrebin 


---


[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...

2018-07-11 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/6308

[FLINK-9799] Generalize and unify state meta info snapshot

## What is the purpose of the change

This PR generalizes and unifies the de/serialization of state meta 
information in backends. We replace the snapshots and reader/writers of the 
individual state types with a general `StateMetaInfoSnapshot` and the 
corresponding `StateMetaInfoSnapshotReadersWriters`. Backwards compatibility is 
maintained.

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
FLINK-9799-generalize-state-meta-pr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6308


commit 5e44f759342793f4532e99f8df589c2416402176
Author: Stefan Richter 
Date:   2018-07-11T09:11:11Z

[FLINK-9799][state] Generalize and unify state meta infos




---


[GitHub] flink issue #6276: [FLINK-9486] Introduce TimerState in keyed state backend

2018-07-09 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6276
  
@tillrohrmann Thanks for the fast review. Merging.


---


[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

2018-07-09 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6276#discussion_r201013659
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java
 ---
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+import java.util.Set;
+
+/**
+ *
+ */
+@Nonnull
+public interface KeyGroupedInternalPriorityQueue extends 
InternalPriorityQueue {
+   Set getSubsetForKeyGroup(int keyGroupId);
--- End diff --

They are all added, in one of the later commits.


---


[GitHub] flink issue #6275: [FLINK-9776] [runtime] Stop sending periodic interrupts o...

2018-07-09 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6275
  
LGTM 👍 


---


[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...

2018-07-09 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6275#discussion_r200911028
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1563,7 +1573,7 @@ public void run() {
 
// log stack trace where the executing thread 
is stuck and
// interrupt the running thread periodically 
while it is still alive
-   while (executerThread.isAlive()) {
+   while (task.shouldInterruptOnCancel() && 
executerThread.isAlive()) {
--- End diff --

Ok, if the intention is improvement and not 100% certainty, then this is 
perfectly ok.


---


[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...

2018-07-06 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6275#discussion_r200737014
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1563,7 +1573,7 @@ public void run() {
 
// log stack trace where the executing thread 
is stuck and
// interrupt the running thread periodically 
while it is still alive
-   while (executerThread.isAlive()) {
+   while (task.shouldInterruptOnCancel() && 
executerThread.isAlive()) {
--- End diff --

I think that an atomic boolean might be required. This check can pass, then 
we get interrupted, meanwhile the stream task might already go into the 
shutdown code and the interrupt might slip through?


---


[GitHub] flink issue #6276: [FLINK-9486] Introduce TimerState in keyed state backend

2018-07-06 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6276
  
CC @tillrohrmann 


---


[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...

2018-07-06 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/6276

[FLINK-9486] Introduce TimerState in keyed state backend

## What is the purpose of the change

This PR integrates `InternalTimerQueue` with keyed state backends (Heap and 
RocksDB), so that we can use the RocksDB-based version in the job for the first 
time. 

We introduce the interface `KeyGroupPartitionedPriorityQueue` as an easy 
adapter to existing snapshotting code. This can probably be removed once the 
queues are fully integrated with the backend's snapshotting, in a followup PR. 

The PR also addresses an issue with the `TreeOrderedCache` that requires a 
"proper" `Comparator` (implemented in `TieBreakingPriorityComparator`) and we 
introduce `PriorityComparator` to give more emphasize to this difference. 
`TieBreakingPriorityComparator` is likely to also go away once we come up with 
an improved caching that is not simply based on a tree.

We introduce `PriorityQueueSetFactory` to the keyed state backends, and 
this is were the queues are build. The current implementation of RocksDB uses 
an additional RocksDB instance until we are fully integrated with backend 
snapshotting, because we are otherwise running into trouble with incremental 
snapshots.

A configuration parameter is introduced to chose the implementation of 
queues for RocksDB, the default is still using the heap variant for now.

Finally, we introduce an additional method for bulk polling in the 
`InternalTimerQueue` interface that opens up future optimizations.

## Verifying this change

This change is already covered by existing tests, such as 
`AbstractEventTimeWindowCheckpointingITCase`, but you would currently need to 
activate it via 
`RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes, if 
activated)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
integrateSetStateWithBackends

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6276.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6276


commit 0d8743e52a658876425b6cef03fef3fef2d09deb
Author: Stefan Richter 
Date:   2018-07-04T11:43:49Z

Remove read options from RocksDBOrderedSetStore

commit 84b1b36357322cf23d50396cbfa0725db95797ea
Author: Stefan Richter 
Date:   2018-07-04T11:51:14Z

Introduce (temporary?/visible for testing) KeyGroupPartitionedPriorityQueue 
interface to work with the existing snapshotting

commit 35e02705f6740854ae18a92b5a6dfbafd3201a8f
Author: Stefan Richter 
Date:   2018-07-04T16:07:54Z

Basic integration with backends / make Rocks timers work

commit 1294ac356162430cf9de86980de1d4a0154f33b8
Author: Stefan Richter 
Date:   2018-07-05T16:46:34Z

Introduce PriorityComparator and tie breaking variant as adapter to 
collections that require a comparator.

This is required because the tree set that we use in the cache expects that 
Comparators are aligned with Object#equals

commit bfd3a12e77348a79c91656d80a7a67ece9825103
Author: Stefan Richter 
Date:   2018-07-05T19:35:08Z

Iterator directly from cache if no store-only elements.

commit fbf26f1f2efbe1e2029d09d297808e26e08b87d8
Author: Stefan Richter 
Date:   2018-07-06T08:22:49Z

Use a dedicated RocksDB instance for priority queue state. We can revert
this once priority queue state is properly integrated with the
snapshotting. Until then, we must isolate the priority queue state in
a separate db or else incremental checkpoints will break.

commit 75cb05ab21e07eaed25e1cac048919f7f313b3f6
Author: Stefan Richter 
Date:   2018-07-06T13:55:02Z

Configuration part

commit 7a86e268072ec4ad9d9fae2fa8e852b66d4424a8
Author: Stefan Richter 
Date:   2018-07-06T14:48:53Z

Introduce bulk poll method in queue to open up future optimizations




---


[GitHub] flink issue #6251: [FLINK-9693] Set Execution#taskRestore to null after depl...

2018-07-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6251
  
LGTM 👍 


---


[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

2018-07-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
I found there is still a small issue with the equals/hashCode but will just 
fix it before merging.


---


[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

2018-07-04 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
LGTM 👍 merging.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199894398
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -272,4 +254,60 @@ public int getVersion() {
previousSerializersAndConfigs.get(index).f0, 
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
}
+
+   /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
+   protected static class PrecomputedParameters implements Serializable {
+   /** Whether target type is immutable. */
+   final boolean immutableTargetType;
+
+   /** Whether target type and its fields are immutable. */
+   final boolean immutable;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   final boolean stateful;
+
+   final int hashCode;
--- End diff --

I wonder if this should be `transient` in a serializable class, the hash 
code could be based on object identity.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199894217
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -272,4 +254,60 @@ public int getVersion() {
previousSerializersAndConfigs.get(index).f0, 
UnloadableDummyTypeSerializer.class,
previousSerializersAndConfigs.get(index).f1, 
fieldSerializers[index]);
}
+
+   /** This class holds composite serializer parameters which can be 
precomputed in advanced for better performance. */
+   protected static class PrecomputedParameters implements Serializable {
--- End diff --

If this is serializable, we should add a version uid.


---


[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6228
  
@sihuazhou @azagrebin thanks for the reviews! I will merge this once my 
travis is green.


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199841799
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
--- End diff --

good point 👍 


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199817513
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+   protected static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 2);
+   protected static final KeyExtractorFunction 
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+   protected static final Comparator TEST_ELEMENT_COMPARATOR =
+   
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+   protected static void insertRandomTimers(
+   @Nonnull InternalPriorityQueue priorityQueue,
+   @Nonnull Set checkSet,
+   int count) {
+
+   ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+   final int numUniqueKeys = Math.max(count / 4, 64);
+
+   long duplicatePriority = Long.MIN_VALUE;
+
+   for (int i = 0; i < count; ++i) {
+   TestElement element;
+   do {
+   long elementPriority;
+   if (duplicatePriority == Long.MIN_VALUE) {
+   elementPriority = 
localRandom.nextLong();
+   } else {
+   elementPriority = duplicatePriority;
+   duplicatePriority = Long.MIN_VALUE;
+   }
+   element = new 
TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+   } while (!checkSet.add(element));
+
+   if (localRandom.nextInt(10) == 0) {
+   duplicatePriority = element.getPriority();
+   }
+
+   final boolean headChangedIndicated = 
priorityQueue.add(element);
+   if (element.equals(priorityQueue.peek())) {
+   Assert.assertTrue(headChangedIndicated);
+   }
+   }
+   Assert.assertEquals(count, priorityQueue.size());
+   }
+
+   @Test
+   public void testPeekPollOrder() {
+   final int initialCapacity = 4;
+   final int testSize = 1000;
+   InternalPriorityQueue priorityQueue =
+   newPriorityQueue(initialCapacity);
+   HashSet checkSet = new HashSet<>(testSize);
+
+   insertRandomTimers(priorityQueue, checkSet, testSize);
+
+   long lastPriorityValue = Long.MIN_VALUE;
+   int 

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199817285
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+   protected static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 2);
+   protected static final KeyExtractorFunction 
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+   protected static final Comparator TEST_ELEMENT_COMPARATOR =
+   
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+   protected static void insertRandomTimers(
+   @Nonnull InternalPriorityQueue priorityQueue,
+   @Nonnull Set checkSet,
+   int count) {
+
+   ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+   final int numUniqueKeys = Math.max(count / 4, 64);
+
+   long duplicatePriority = Long.MIN_VALUE;
+
+   for (int i = 0; i < count; ++i) {
+   TestElement element;
+   do {
+   long elementPriority;
+   if (duplicatePriority == Long.MIN_VALUE) {
+   elementPriority = 
localRandom.nextLong();
+   } else {
+   elementPriority = duplicatePriority;
+   duplicatePriority = Long.MIN_VALUE;
+   }
+   element = new 
TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+   } while (!checkSet.add(element));
+
+   if (localRandom.nextInt(10) == 0) {
+   duplicatePriority = element.getPriority();
+   }
+
+   final boolean headChangedIndicated = 
priorityQueue.add(element);
+   if (element.equals(priorityQueue.peek())) {
+   Assert.assertTrue(headChangedIndicated);
+   }
+   }
+   Assert.assertEquals(count, priorityQueue.size());
+   }
+
+   @Test
+   public void testPeekPollOrder() {
+   final int initialCapacity = 4;
+   final int testSize = 1000;
+   InternalPriorityQueue priorityQueue =
+   newPriorityQueue(initialCapacity);
+   HashSet checkSet = new HashSet<>(testSize);
+
+   insertRandomTimers(priorityQueue, checkSet, testSize);
+
+   long lastPriorityValue = Long.MIN_VALUE;
+   int 

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199817158
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+   protected static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 2);
+   protected static final KeyExtractorFunction 
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+   protected static final Comparator TEST_ELEMENT_COMPARATOR =
+   
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+   protected static void insertRandomTimers(
+   @Nonnull InternalPriorityQueue priorityQueue,
+   @Nonnull Set checkSet,
+   int count) {
+
+   ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+   final int numUniqueKeys = Math.max(count / 4, 64);
+
+   long duplicatePriority = Long.MIN_VALUE;
+
+   for (int i = 0; i < count; ++i) {
+   TestElement element;
+   do {
+   long elementPriority;
+   if (duplicatePriority == Long.MIN_VALUE) {
+   elementPriority = 
localRandom.nextLong();
+   } else {
+   elementPriority = duplicatePriority;
+   duplicatePriority = Long.MIN_VALUE;
+   }
+   element = new 
TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+   } while (!checkSet.add(element));
+
+   if (localRandom.nextInt(10) == 0) {
+   duplicatePriority = element.getPriority();
+   }
+
+   final boolean headChangedIndicated = 
priorityQueue.add(element);
+   if (element.equals(priorityQueue.peek())) {
+   Assert.assertTrue(headChangedIndicated);
+   }
+   }
+   Assert.assertEquals(count, priorityQueue.size());
+   }
+
+   @Test
+   public void testPeekPollOrder() {
+   final int initialCapacity = 4;
+   final int testSize = 1000;
+   InternalPriorityQueue priorityQueue =
+   newPriorityQueue(initialCapacity);
+   HashSet checkSet = new HashSet<>(testSize);
+
+   insertRandomTimers(priorityQueue, checkSet, testSize);
+
+   long lastPriorityValue = Long.MIN_VALUE;
+   int 

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199816750
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+   protected static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 2);
+   protected static final KeyExtractorFunction 
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+   protected static final Comparator TEST_ELEMENT_COMPARATOR =
+   
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+   protected static void insertRandomTimers(
+   @Nonnull InternalPriorityQueue priorityQueue,
+   @Nonnull Set checkSet,
+   int count) {
+
+   ThreadLocalRandom localRandom = ThreadLocalRandom.current();
+
+   final int numUniqueKeys = Math.max(count / 4, 64);
+
+   long duplicatePriority = Long.MIN_VALUE;
+
+   for (int i = 0; i < count; ++i) {
+   TestElement element;
+   do {
+   long elementPriority;
+   if (duplicatePriority == Long.MIN_VALUE) {
+   elementPriority = 
localRandom.nextLong();
+   } else {
+   elementPriority = duplicatePriority;
+   duplicatePriority = Long.MIN_VALUE;
+   }
+   element = new 
TestElement(localRandom.nextInt(numUniqueKeys), elementPriority);
+   } while (!checkSet.add(element));
+
+   if (localRandom.nextInt(10) == 0) {
+   duplicatePriority = element.getPriority();
+   }
+
+   final boolean headChangedIndicated = 
priorityQueue.add(element);
+   if (element.equals(priorityQueue.peek())) {
+   Assert.assertTrue(headChangedIndicated);
+   }
+   }
+   Assert.assertEquals(count, priorityQueue.size());
+   }
+
+   @Test
+   public void testPeekPollOrder() {
+   final int initialCapacity = 4;
+   final int testSize = 1000;
+   InternalPriorityQueue priorityQueue =
+   newPriorityQueue(initialCapacity);
+   HashSet checkSet = new HashSet<>(testSize);
+
+   insertRandomTimers(priorityQueue, checkSet, testSize);
+
+   long lastPriorityValue = Long.MIN_VALUE;
+   int 

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199816552
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java
 ---
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Testbase for implementations of {@link InternalPriorityQueue}.
+ */
+public abstract class InternalPriorityQueueTestBase extends TestLogger {
+
+   protected static final KeyGroupRange KEY_GROUP_RANGE = new 
KeyGroupRange(0, 2);
+   protected static final KeyExtractorFunction 
KEY_EXTRACTOR_FUNCTION = TestElement::getKey;
+   protected static final Comparator TEST_ELEMENT_COMPARATOR =
+   
Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey);
+
+   protected static void insertRandomTimers(
--- End diff --

👍 


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199816139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally 
partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics 
for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param  the type of elements in the queue.
+ * @param  type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue & HeapPriorityQueueElement>
+   implements InternalPriorityQueue {
+
+   /** A heap of heap sets. Each sub-heap represents the partition for a 
key-group.*/
+   @Nonnull
+   private final HeapPriorityQueue keyGroupHeap;
+
+   /** All elements from keyGroupHeap, indexed by their key-group id, 
relative to firstKeyGroup. */
+   @Nonnull
+   private final PQ[] keyGroupLists;
+
+   /** Function to extract the key from contained elements. */
+   @Nonnull
+   private final KeyExtractorFunction keyExtractor;
+
+   /** The total number of key-groups (in the job). */
+   @Nonnegative
+   private final int totalKeyGroups;
+
+   /** The smallest key-group id with a subpartition managed by this 
ordered set. */
+   @Nonnegative
+   private final int firstKeyGroup;
+
+   @SuppressWarnings("unchecked")
+   public KeyGroupPartitionedPriorityQueue(
+   @Nonnull KeyExtractorFunction keyExtractor,
+   @Nonnull Comparator elementComparator,
+   @Nonnull PartitionQueueSetFactory orderedCacheFactory,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalKeyGroups) {
+
+   this.keyExtractor = keyExtractor;
+   this.totalKeyGroups = totalKeyGroups;
+   this.firstKeyGroup = keyGroupRange.getStartKeyGroup();
+   this.keyGroupLists = (PQ[]) new 
InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()];
+   this.keyGroupHeap = new HeapPriorityQueue<>(
+   new 
InternalPriorityQueueComparator<>(elementComparator),
+   keyGroupRange.getNumberOfKeyGroups());
+   for (int i = 0; i < keyGroupLists.length; i++) {
+   final PQ keyGroupCache =
+   orderedCacheFactory.create(firstKeyGroup + i, 
totalKeyGroups, elementComparator);
+   keyGroupLists[i] = keyGroupCache;
+   keyGroupHeap.add(keyGroupCache);
+   }
+   }
+
+   @Nullable
+   @Override
+   public T poll() {
+   final PQ headList = keyGroupHeap.peek();
+   final T head = headList.poll();
+   keyGroupHeap.adjustModifiedElement(headList);
+   return head;
+   }
+
+   @Nullable
+   @Override
+   public T peek() {
+   return keyGroupHeap.peek().peek();
+   }
+
+   @Override
+   public boolean add(@Nonnull T toAdd) {
+   final PQ list = getListForElementKeyGroup(toAdd);
+
+   // the branch c

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199816118
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java
 ---
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedSetStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
+
+   /** Serialized empty value to insert into RocksDB. */
+   private static final byte[] DUMMY_BYTES = 
"0".getBytes(ConfigConstants.DEFAULT_CHARSET);
--- End diff --

👍 


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199815863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java
 ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+/**
+ * This implementation of {@link InternalPriorityQueue} is internally 
partitioned into sub-queues per key-group and
+ * essentially works as a heap-of-heaps. Instances will have set semantics 
for elements if the sub-queues have set
+ * semantics.
+ *
+ * @param  the type of elements in the queue.
+ * @param  type type of sub-queue used for each key-group partition.
+ */
+public class KeyGroupPartitionedPriorityQueue & HeapPriorityQueueElement>
+   implements InternalPriorityQueue {
+
+   /** A heap of heap sets. Each sub-heap represents the partition for a 
key-group.*/
+   @Nonnull
+   private final HeapPriorityQueue keyGroupHeap;
+
+   /** All elements from keyGroupHeap, indexed by their key-group id, 
relative to firstKeyGroup. */
+   @Nonnull
+   private final PQ[] keyGroupLists;
--- End diff --

👍 


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199814790
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java
 ---
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A heap-based priority queue with set semantics, based on {@link 
HeapPriorityQueue}. The heap is supported by hash
+ * set for fast contains (de-duplication) and deletes. Object 
identification happens based on {@link #equals(Object)}.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap and the 
deduplication set.
+ *  We could replace the deduplication maps with more efficient custom 
implementations. In particular, a hash set
+ * would be enough if it could return existing elements on unsuccessful 
adding, etc..
+ * 
+ *
+ * @param  type of the contained elements.
+ */
+public class HeapPriorityQueueSet 
extends HeapPriorityQueue {
+
+   /**
+* Function to extract the key from contained elements.
+*/
+   private final KeyExtractorFunction keyExtractor;
+
+   /**
+* This array contains one hash set per key-group. The sets are used 
for fast de-duplication and deletes of elements.
+*/
+   private final HashMap[] deduplicationMapsByKeyGroup;
+
+   /**
+* The key-group range of elements that are managed by this queue.
+*/
+   private final KeyGroupRange keyGroupRange;
+
+   /**
+* The total number of key-groups of the job.
+*/
+   private final int totalNumberOfKeyGroups;
+
+   /**
+* Creates an empty {@link HeapPriorityQueueSet} with the requested 
initial capacity.
+*
+* @param elementComparator comparator for the contained elements.
+* @param keyExtractor function to extract a key from the contained 
elements.
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+* @param keyGroupRange the key-group range of the elements in this set.
+* @param totalNumberOfKeyGroups the total number of key-groups of the 
job.
+*/
+   @SuppressWarnings("unchecked")
+   public HeapPriorityQueueSet(
+   @Nonnull Comparator elementComparator,
+   @Nonnull KeyExtractorFunction keyExtractor,
+   @Nonnegative int minimumCapacity,
+   @Nonnull KeyGroupRange keyGroupRange,
+   @Nonnegative int totalNumberOfKeyGroups) {
+
+   super(elementComparator, minimumCapacity);
+
+   this.keyExtractor = keyExtractor;
+
+   this.totalNumberOfKeyGroups = totalNumberOfKeyGroups;
+   this.keyGroupRange = keyGroupRange;
+
+   final int keyGroupsInLocalRange = 
keyGroupRange.getNumberOfKeyGroups();
+   final int deduplicationSetSize = 1 + minimumCapacity / 
keyGroupsInLocalRange;
+   this.deduplicationMapsByKeyGroup = new 
HashMap[keyGroupsInLocalRange];
+   for (int i = 0; i < keyGroupsInLocalRange; ++i) {
+   deduplicationMapsByKeyGroup[i] = new 
HashMap<>(deduplicationSetSize);
+   }
+   }
+
+   @Override
+   @Nullable
+   pu

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199814578
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} 
objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link 
HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the 
heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object 
identification of remove is based on object identity and
+ * not on equals.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap.
+ * 
+ *
+ * @param  type of the contained elements.
+ */
+public class HeapPriorityQueue 
implements InternalPriorityQueue {
+
+   /**
+* The index of the head element in the array that represents the heap.
+*/
+   private static final int QUEUE_HEAD_INDEX = 1;
+
+   /**
+* Comparator for the contained elements.
+*/
+   private final Comparator elementComparator;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private T[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* Creates an empty {@link HeapPriorityQueue} with the requested 
initial capacity.
+*
+* @param elementComparator comparator for the contained elements.
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   public HeapPriorityQueue(
+   @Nonnull Comparator elementComparator,
+   @Nonnegative int minimumCapacity) {
+
+   this.elementComparator = elementComparator;
+   this.queue = (T[]) new 
HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+   }
+
+   @Override
+   @Nullable
+   public T poll() {
+   return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : 
null;
+   }
+
+   @Override
+   @Nullable
+   public T peek() {
+   return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+   }
+
+   /**
+* Adds the element to add to the heap. This element should not be 
managed by any other {@link HeapPriorityQueue}.
+*
+* @return true if the operation changed the head element 
or if is it unclear if the head element changed.
+* Only returns false iff the head element was not changed 
by this operation.
+*/
+   @Override
+   public boolean add(@Nonnull T toAdd) {
+   return addInternal(toAdd);
+   }
+
+   /**
+* This remove is based on object identity, not the result of equals.
+*
+* @return true if the operation changed the head element 
or if is it unclear if the head element changed.
+* Only returns false iff the head element was not changed 
by this operation.
+*/
+   @Override
+   public boolean remove(@Nonnull T toStop) {
+   return removeInternal(toStop);
+   }
+

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199813559
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} 
objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link 
HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the 
heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object 
identification of remove is based on object identity and
+ * not on equals.
+ *
+ * Possible future improvements:
+ * 
+ *  We could also implement shrinking for the heap.
+ * 
+ *
+ * @param  type of the contained elements.
+ */
+public class HeapPriorityQueue 
implements InternalPriorityQueue {
+
+   /**
+* The index of the head element in the array that represents the heap.
+*/
+   private static final int QUEUE_HEAD_INDEX = 1;
+
+   /**
+* Comparator for the contained elements.
+*/
+   private final Comparator elementComparator;
+
+   /**
+* The array that represents the heap-organized priority queue.
+*/
+   private T[] queue;
+
+   /**
+* The current size of the priority queue.
+*/
+   private int size;
+
+   /**
+* Creates an empty {@link HeapPriorityQueue} with the requested 
initial capacity.
+*
+* @param elementComparator comparator for the contained elements.
+* @param minimumCapacity the minimum and initial capacity of this 
priority queue.
+*/
+   @SuppressWarnings("unchecked")
+   public HeapPriorityQueue(
+   @Nonnull Comparator elementComparator,
+   @Nonnegative int minimumCapacity) {
+
+   this.elementComparator = elementComparator;
+   this.queue = (T[]) new 
HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity];
+   }
+
+   @Override
+   @Nullable
+   public T poll() {
+   return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : 
null;
+   }
+
+   @Override
+   @Nullable
+   public T peek() {
+   return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null;
+   }
+
+   /**
+* Adds the element to add to the heap. This element should not be 
managed by any other {@link HeapPriorityQueue}.
+*
+* @return true if the operation changed the head element 
or if is it unclear if the head element changed.
+* Only returns false iff the head element was not changed 
by this operation.
+*/
+   @Override
+   public boolean add(@Nonnull T toAdd) {
+   return addInternal(toAdd);
+   }
+
+   /**
+* This remove is based on object identity, not the result of equals.
+*
+* @return true if the operation changed the head element 
or if is it unclear if the head element changed.
+* Only returns false iff the head element was not changed 
by this operation.
+*/
+   @Override
+   public boolean remove(@Nonnull T toStop) {
--- End diff --

👍 


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199813529
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java
 ---
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.runtime.state.InternalPriorityQueue;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE;
+
+/**
+ * Basic heap-based priority queue for {@link HeapPriorityQueueElement} 
objects. This heap supports fast deletes
+ * because it manages position indexes of the contained {@link 
HeapPriorityQueueElement}s. The heap implementation is
+ * a simple binary tree stored inside an array. Element indexes in the 
heap array start at 1 instead of 0 to make array
+ * index computations a bit simpler in the hot methods. Object 
identification of remove is based on object identity and
--- End diff --

👍 


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199812541
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java
 ---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Function to extract a key from a given object.
+ *
+ * @param  type of the element from which we extract the key.
+ */
+@FunctionalInterface
+public interface KeyExtractorFunction {
--- End diff --

I find it useful when concepts have names attached to it and some form of 
typing otherwise, you end up with a lot of `Function<>` objects and have to 
think twice about their concrete use-case.


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199812074
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * Interface for collection that gives in order access to elements w.r.t 
their priority.
+ *
+ * @param  type of elements in the ordered set.
+ */
+@Internal
+public interface InternalPriorityQueue {
+
+   /**
+* Retrieves and removes the first element (w.r.t. the order) of this 
set,
+* or returns {@code null} if this set is empty.
+*
+* @return the first element of this ordered set, or {@code null} if 
this set is empty.
+*/
+   @Nullable
+   T poll();
+
+   /**
+* Retrieves, but does not remove, the element (w.r.t. order) of this 
set,
+* or returns {@code null} if this set is empty.
+*
+* @return the first element (w.r.t. order) of this ordered set, or 
{@code null} if this set is empty.
+*/
+   @Nullable
+   T peek();
+
+   /**
+* Adds the given element to the set, if it is not already contained.
+*
+* @param toAdd the element to add to the set.
+* @return true if the operation changed the head element 
or if is it unclear if the head element changed.
+* Only returns false iff the head element was not changed 
by this operation.
+*/
+   boolean add(@Nonnull T toAdd);
+
+   /**
+* Removes the given element from the set, if is contained in the set.
+*
+* @param toRemove the element to remove.
+* @return true if the operation changed the head element 
or if is it unclear if the head element changed.
+* Only returns false iff the head element was not changed 
by this operation.
--- End diff --

👍 


---


[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
Had a few more comments, but they all are basically optimizations. I leave 
it up to you if you still want to address all or some of them. Please let me 
know. Otherwise, we can merge this. 👍 


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199783587
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
+   }
+
+   private boolean isStateful() {
+   TypeSerializer[] duplicatedSerializers = 
duplicateFieldSerializers();
+   return IntStream.range(0, fieldSerializers.length)
+   .anyMatch(i -> fieldSerializers[i] != 
duplicatedSerializers[i]);
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return stateful ? 
createSerializerInstance(duplicateFieldSerializers()) : this;
--- End diff --

Another small point here for `createSerializerInstance(...)`: we have no 
(non-public) constructor that can also take all boolean flags, length, and 
(maybe) hash directly. So if we copy the serializer, I guess it always goes 
through the whole process again to figure this out, but we could just copy it 
from the previous instance.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782860
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
+   }
+
+   private boolean isStateful() {
+   TypeSerializer[] duplicatedSerializers = 
duplicateFieldSerializers();
--- End diff --

The flag for `isStateful()` is the only one that I suggested as a candidate 
for lazy init when `duplicate()` is called for the first time. Reason is that 
duplicating some types of inner serializers can sometimes be a bit expensive. 
But again, I feel that this can also be changed in followup work, if needed.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199782303
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   /** Serializers for fields which constitute T. */
+   protected final TypeSerializer[] fieldSerializers;
+
+   /** Whether T is an immutable type. */
+   final boolean immutableTargetType;
+
+   /** Byte length of target object in serialized form. */
+   private final int length;
+
+   /** Whether any field serializer is stateful. */
+   private final boolean stateful;
+
+   private final int hashCode;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean immutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.immutableTargetType = immutableTargetType &&
+   
Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType);
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   this.stateful = isStateful();
+   this.hashCode = Arrays.hashCode(fieldSerializers);
--- End diff --

I think up to this point, the code is iterating `fieldSerializers` 5 times 
(null checks, immutable check, length calc, stateful check, and hash code 
computation. It could be done in one iteration, but since this method should 
typically not be called in hot loops, this is an optional improvement.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199485075
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
--- End diff --

I think in Java code style, a boolean field name should not be prefixed 
with `is...`, only the getter should be prefixed with `is...`


---


[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6196
  
I had a few more comments, in particular some improvements for the new 
serializer. I think when those are addressed this is good to merge.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199477483
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+@SuppressWarnings("unchecked")
+public class TtlStateFactory {
--- End diff --

It might make sense to also have a test when we introduce a new class that 
provides some kind of "service", e.g. to check that all the types are correctly 
mapped and also prevent that somebody breaks the mapping by accident.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199476870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+@SuppressWarnings("unchecked")
--- End diff --

I would not suppress warnings in the scope of a full class, better more 
fine grained on methods.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474873
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
--- End diff --

We can compute many things like length, immutability, etc already in the 
constructor. Statelessness is the one thing that we might want to figure out 
and remember on the first attempt.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474421
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
+   if (!fieldSerializer.isImmutableType()) {
+   return false;
+   }
+   }
+   return isImmutableTargetType;
+   }
+
+   @Override
+   public T createInstance() {
+   Object[] fields = new Object[fieldSerializers.length];
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   fields[index] = 
fieldSerializers[index].createInstance();
+   }
+   return createInstance(fields);
+   }
+
+   @Override
+   public T copy(T from) {
+   Preconditions.checkNotNull(from);
+   Object[] fields = new Object[fieldSerializers.length];
+   for (int index = 0; index < fieldSerializers.length; index++)

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474494
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
+   if (!fieldSerializer.isImmutableType()) {
+   return false;
+   }
+   }
+   return isImmutableTargetType;
+   }
+
+   @Override
+   public T createInstance() {
+   Object[] fields = new Object[fieldSerializers.length];
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   fields[index] = 
fieldSerializers[index].createInstance();
+   }
+   return createInstance(fields);
+   }
+
+   @Override
+   public T copy(T from) {
+   Preconditions.checkNotNull(from);
+   Object[] fields = new Object[fieldSerializers.length];
+   for (int index = 0; index < fieldSerializers.length; index++)

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474258
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
+   stateful = true;
+   }
+   }
+   return stateful ? 
createSerializerInstance(duplicatedSerializers) : this;
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   for (TypeSerializer fieldSerializer : fieldSerializers) 
{
--- End diff --

Why not compute this once in the constructor and remember in a boolean 
flag??


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r199474152
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a composite type using array of its field 
serializers.
+ * Fields are indexed the same way as their serializers.
+ *
+ * @param  type of custom serialized value
+ */
+public abstract class CompositeSerializer extends TypeSerializer {
+   private static final long serialVersionUID = 1L;
+
+   protected final TypeSerializer[] fieldSerializers;
+   final boolean isImmutableTargetType;
+   private final int length;
+
+   @SuppressWarnings("unchecked")
+   protected CompositeSerializer(boolean isImmutableTargetType, 
TypeSerializer ... fieldSerializers) {
+   Preconditions.checkNotNull(fieldSerializers);
+   
Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull));
+   this.isImmutableTargetType = isImmutableTargetType;
+   this.fieldSerializers = (TypeSerializer[]) 
fieldSerializers;
+   this.length = calcLength();
+   }
+
+   /** Create new instance from its fields.  */
+   public abstract T createInstance(@Nonnull Object ... values);
+
+   /** Modify field of existing instance. Supported only by mutable types. 
*/
+   protected abstract void setField(@Nonnull T value, int index, Object 
fieldValue);
+
+   /** Get field of existing instance. */
+   protected abstract Object getField(@Nonnull T value, int index);
+
+   /** Factory for concrete serializer. */
+   protected abstract CompositeSerializer 
createSerializerInstance(TypeSerializer ... originalSerializers);
+
+   @Override
+   public CompositeSerializer duplicate() {
+   TypeSerializer[] duplicatedSerializers = new 
TypeSerializer[fieldSerializers.length];
+   boolean stateful = false;
+   for (int index = 0; index < fieldSerializers.length; index++) {
+   duplicatedSerializers[index] = 
fieldSerializers[index].duplicate();
+   if (fieldSerializers[index] != 
duplicatedSerializers[index]) {
--- End diff --

I wonder if we need to do these checks every time `duplicate()` is called? 
We could check it once, remember if all field serializer are stateless and from 
that point return `this` immediately.


---


[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6228
  
@sihuazhou thanks for the fast review. I addressed all your comments.


---


[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199442186
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
+
+   /** Serialized empty value to insert into RocksDB. */
+   private static final byte[] DUMMY_BYTES = 
"0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+   /** The RocksDB instance that serves as store. */
+   @Nonnull
+   private final RocksDB db;
+
+   /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+   @Nonnull
+   private final ColumnFamilyHandle columnFamilyHandle;
+
+   /** Read options for RocksDB. */
+   @Nonnull
+   private final ReadOptions readOptions;
+
+   /**
+* Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+* aligned with their logical order.
+*/
+   @Nonnull
+   private final TypeSerializer byteOrderProducingSerializer;
+
+   /** Wrapper to batch all writes to RocksDB. */
+   @Nonnull
+   private final RocksDBWriteBatchWrapper batchWrapper;
+
+   /** The key-group id of all elements stored in this instance. */
+   @Nonnegative
+   private final int keyGroupId;
+
+   /** The key-group id in serialized form. */
+   @Nonnull
+   private final byte[] groupPrefixBytes;
+
+   /** Output stream that helps to serialize elements. */
+   @Nonnull
+   private final ByteArrayOutputStreamWithPos outputStream;
+
+   /** Output view that helps to serialize elements, must wrap the output 
stream. */
+   @Nonnull
+   private final DataOutputViewStreamWrapper outputView;
+
+   public RocksDBOrderedStore(
+   @Nonnegative int keyGroupId,
+   @Nonnull RocksDB db,
+   @Nonnull ColumnFamilyHandle columnFamilyHandle,
+   @Nonnull ReadOptions readOptions,
+   @Nonnull TypeSerializer byteOrderProducingSerializer,
+   @Nonnull ByteArrayOutputStreamWithPos outputStream,
+   @Nonnull DataOutputViewStreamWrapper outputView,
+   @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+   this.db = db;
+   this.columnFamilyHandle = columnFamilyHandle;
+   

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199427083
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
+
+   /** Serialized empty value to insert into RocksDB. */
+   private static final byte[] DUMMY_BYTES = 
"0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+   /** The RocksDB instance that serves as store. */
+   @Nonnull
+   private final RocksDB db;
+
+   /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+   @Nonnull
+   private final ColumnFamilyHandle columnFamilyHandle;
+
+   /** Read options for RocksDB. */
+   @Nonnull
+   private final ReadOptions readOptions;
+
+   /**
+* Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+* aligned with their logical order.
+*/
+   @Nonnull
+   private final TypeSerializer byteOrderProducingSerializer;
+
+   /** Wrapper to batch all writes to RocksDB. */
+   @Nonnull
+   private final RocksDBWriteBatchWrapper batchWrapper;
+
+   /** The key-group id of all elements stored in this instance. */
+   @Nonnegative
+   private final int keyGroupId;
+
+   /** The key-group id in serialized form. */
+   @Nonnull
+   private final byte[] groupPrefixBytes;
+
+   /** Output stream that helps to serialize elements. */
+   @Nonnull
+   private final ByteArrayOutputStreamWithPos outputStream;
+
+   /** Output view that helps to serialize elements, must wrap the output 
stream. */
+   @Nonnull
+   private final DataOutputViewStreamWrapper outputView;
+
+   public RocksDBOrderedStore(
+   @Nonnegative int keyGroupId,
+   @Nonnull RocksDB db,
+   @Nonnull ColumnFamilyHandle columnFamilyHandle,
+   @Nonnull ReadOptions readOptions,
+   @Nonnull TypeSerializer byteOrderProducingSerializer,
+   @Nonnull ByteArrayOutputStreamWithPos outputStream,
+   @Nonnull DataOutputViewStreamWrapper outputView,
+   @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+   this.db = db;
+   this.columnFamilyHandle = columnFamilyHandle;
+   

[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...

2018-07-02 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6228#discussion_r199426052
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java
 ---
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * Implementation of {@link 
org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore}
+ * based on RocksDB.
+ *
+ * IMPORTANT: The store is ordered and the order is determined by the 
lexicographic order of the byte sequences
+ * produced by the provided serializer for the elements!
+ *
+ * @param  the type of stored elements.
+ */
+public class RocksDBOrderedStore implements 
CachingInternalPriorityQueueSet.OrderedSetStore {
+
+   /** Serialized empty value to insert into RocksDB. */
+   private static final byte[] DUMMY_BYTES = 
"0".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+   /** The RocksDB instance that serves as store. */
+   @Nonnull
+   private final RocksDB db;
+
+   /** Handle to the column family of the RocksDB instance in which the 
elements are stored. */
+   @Nonnull
+   private final ColumnFamilyHandle columnFamilyHandle;
+
+   /** Read options for RocksDB. */
+   @Nonnull
+   private final ReadOptions readOptions;
+
+   /**
+* Serializer for the contained elements. The lexicographical order of 
the bytes of serialized objects must be
+* aligned with their logical order.
+*/
+   @Nonnull
+   private final TypeSerializer byteOrderProducingSerializer;
+
+   /** Wrapper to batch all writes to RocksDB. */
+   @Nonnull
+   private final RocksDBWriteBatchWrapper batchWrapper;
+
+   /** The key-group id of all elements stored in this instance. */
+   @Nonnegative
+   private final int keyGroupId;
+
+   /** The key-group id in serialized form. */
+   @Nonnull
+   private final byte[] groupPrefixBytes;
+
+   /** Output stream that helps to serialize elements. */
+   @Nonnull
+   private final ByteArrayOutputStreamWithPos outputStream;
+
+   /** Output view that helps to serialize elements, must wrap the output 
stream. */
+   @Nonnull
+   private final DataOutputViewStreamWrapper outputView;
+
+   public RocksDBOrderedStore(
+   @Nonnegative int keyGroupId,
+   @Nonnull RocksDB db,
+   @Nonnull ColumnFamilyHandle columnFamilyHandle,
+   @Nonnull ReadOptions readOptions,
+   @Nonnull TypeSerializer byteOrderProducingSerializer,
+   @Nonnull ByteArrayOutputStreamWithPos outputStream,
+   @Nonnull DataOutputViewStreamWrapper outputView,
+   @Nonnull RocksDBWriteBatchWrapper batchWrapper) {
+   this.db = db;
+   this.columnFamilyHandle = columnFamilyHandle;
+   

[GitHub] flink issue #5799: [FLINK-7775] Remove unreferenced method PermanentBlobCach...

2018-06-29 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5799
  
LGTM 👍 Merging.


---


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-29 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6186
  
Merging this.


---


[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers

2018-06-29 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6186
  
Had one more minor comment. Besides, this looks good 👍 Nice job!


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-29 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r199109435
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java 
---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps map state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry key of state with TTL
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlMapState
+   extends AbstractTtlState, Map>, 
InternalMapState>>
+   implements InternalMapState {
+   TtlMapState(
+   InternalMapState> original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(original, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public UV get(UK key) throws Exception {
+   return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+   }
+
+   @Override
+   public void put(UK key, UV value) throws Exception {
+   original.put(key, wrapWithTs(value));
+   }
+
+   @Override
+   public void putAll(Map map) throws Exception {
+   if (map == null) {
+   return;
+   }
+   Map> ttlMap = new HashMap<>();
--- End diff --

We can already initialize the new map with `map.size()`.


---


[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...

2018-06-29 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/6228
  
CC @azagrebin  @sihuazhou 


---


  1   2   3   4   5   6   7   8   9   10   >