[GitHub] flink issue #6388: [FLINK-6222] Allow passing env variables to start scripts...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6388 LGTM, except the one remaining indentation problem mentioned by @zentol . ---
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r204345031 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java --- @@ -197,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except for (KeyRangeStates keyRange : snapshotKeyRanges.get()) { keyRanges.add(keyRange); } + + // let event time start from the max of all event time progress across subtasks in the last execution + for (Long lastEventTime : lastEventTimes.get()) { + monotonousEventTime = Math.max(monotonousEventTime, lastEventTime); --- End diff -- Yes, I think it should be fine because it is just for the generator. Just wanted to double check ð ---
[GitHub] flink issue #6351: [FLINK-9862] [test] Extend general puropose DataStream te...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6351 Had one question. Otherwise LGTM ð ---
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r204336316 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java --- @@ -197,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except for (KeyRangeStates keyRange : snapshotKeyRanges.get()) { keyRanges.add(keyRange); } + + // let event time start from the max of all event time progress across subtasks in the last execution + for (Long lastEventTime : lastEventTimes.get()) { + monotonousEventTime = Math.max(monotonousEventTime, lastEventTime); --- End diff -- Or, you really need to track the watermark per key-group partition. ---
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r204336026 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java --- @@ -197,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except for (KeyRangeStates keyRange : snapshotKeyRanges.get()) { keyRanges.add(keyRange); } + + // let event time start from the max of all event time progress across subtasks in the last execution + for (Long lastEventTime : lastEventTimes.get()) { + monotonousEventTime = Math.max(monotonousEventTime, lastEventTime); --- End diff -- I wonder why we compute the event time as the max and not as the min, as we would usually do for a combined watermark. This is probably never rescaled anyways, but still it looks a bit suspicious. ---
[GitHub] flink issue #6376: [FLINK-9902][tests] Improve and refactor window checkpoin...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6376 Thanks for the fast review @aljoscha . Addressed the comment about the `while`-loop and will merge now. ---
[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203982061 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Source for window checkpointing IT cases that can introduce artificial failures. + */ +public class FailingSource extends RichSourceFunction> + implements ListCheckpointed, CheckpointListener { + + /** +* Function to generate and emit the test events (and watermarks if required). +*/ + @FunctionalInterface + public interface EventEmittingGenerator extends Serializable { + void emitEvent(SourceContext> ctx, int eventSequenceNo); + } + + private static final long INITIAL = Long.MIN_VALUE; + private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE; + + @Nonnull + private final EventEmittingGenerator eventEmittingGenerator; + private final int expectedEmitCalls; + private final int failureAfterNumElements; + private final boolean usingProcessingTime; + private final AtomicLong checkpointStatus; + + private int emitCallCount; + private volatile boolean running; + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations) { + this(eventEmittingGenerator, numberOfGeneratorInvocations, TimeCharacteristic.EventTime); + } + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations, + @Nonnull TimeCharacteristic timeCharacteristic) { + this.eventEmittingGenerator = eventEmittingGenerator; + this.running = true; + this.emitCallCount = 0; + this.expectedEmitCalls = numberOfGeneratorInvocations; + this.failureAfterNumElements = numberOfGeneratorInvocations / 2; + this.checkpointStatus = new AtomicLong(INITIAL); + this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime; + } + + @Override + public void open(Configuration parameters) { + // non-parallel source + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void run(SourceContext> ctx) throws Exception { + + final RuntimeContext runtimeContext = getRuntimeContext(); + // detect if this task is "the chosen one" and should fail (via subtaskidx), if it did not fail before (via attempt) + final boolean failThisTask = + runtimeContext.getAttemptNumber() == 0 && runtimeContext.getIndexOfThisSubtask() == 0; + + // we loop longer than we have elements, to permit delayed checkpoints + // to still cause a failure + while (running) { + + // the function failed before
[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203981882 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/ValidatingSink.java --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.test.util.SuccessException; + +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Generalized sink for validation of window checkpointing IT cases. + */ +public class ValidatingSink extends RichSinkFunction + implements ListCheckpointed> { + + /** +* Function to check if the window counts are as expected. +*/ + @FunctionalInterface + public interface ResultChecker extends Serializable { + boolean checkResult(Map windowCounts); + } + + /** +* Function that updates the window counts from an update event. +* +* @param type of the update event. +*/ + public interface CountUpdater extends Serializable { + void updateCount(T update, Map windowCounts); + } + + @Nonnull + private final ResultChecker resultChecker; + + @Nonnull + private final CountUpdater countUpdater; + + @Nonnull + private final HashMap windowCounts; + + private final boolean usingProcessingTime; + + public ValidatingSink( + @Nonnull CountUpdater countUpdater, + @Nonnull ResultChecker resultChecker) { + this(countUpdater, resultChecker, TimeCharacteristic.EventTime); + } + + public ValidatingSink( + @Nonnull CountUpdater countUpdater, + @Nonnull ResultChecker resultChecker, + @Nonnull TimeCharacteristic timeCharacteristic) { + + this.resultChecker = resultChecker; + this.countUpdater = countUpdater; + this.usingProcessingTime = TimeCharacteristic.ProcessingTime == timeCharacteristic; + this.windowCounts = new HashMap<>(); + } + + @Override + public void open(Configuration parameters) throws Exception { + // this sink can only work with DOP 1 + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + if (usingProcessingTime && resultChecker.checkResult(windowCounts)) { --- End diff -- Was also not 100% sure if this is needed. My reason was that in theory we could have restored from a checkpoint of a completed job that will not emit events, and we will not get into `close()` unless we somehow fire a `SuccessException`. ---
[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203978845 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java --- @@ -92,19 +89,21 @@ private static Configuration getConfiguration() { @Test public void testTumblingProcessingTimeWindow() { final int numElements = 3000; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(timeCharacteristic); env.getConfig().setAutoWatermarkInterval(10); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); --- End diff -- Yip, and it is also no longer required because we do no longer work with `SuccessException` if it is not required. ---
[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203978691 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/FailingSource.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.CheckpointListener; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; + +/** + * Source for window checkpointing IT cases that can introduce artificial failures. + */ +public class FailingSource extends RichSourceFunction> + implements ListCheckpointed, CheckpointListener { + + /** +* Function to generate and emit the test events (and watermarks if required). +*/ + @FunctionalInterface + public interface EventEmittingGenerator extends Serializable { + void emitEvent(SourceContext> ctx, int eventSequenceNo); + } + + private static final long INITIAL = Long.MIN_VALUE; + private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE; + + @Nonnull + private final EventEmittingGenerator eventEmittingGenerator; + private final int expectedEmitCalls; + private final int failureAfterNumElements; + private final boolean usingProcessingTime; + private final AtomicLong checkpointStatus; + + private int emitCallCount; + private volatile boolean running; + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations) { + this(eventEmittingGenerator, numberOfGeneratorInvocations, TimeCharacteristic.EventTime); + } + + public FailingSource( + @Nonnull EventEmittingGenerator eventEmittingGenerator, + @Nonnegative int numberOfGeneratorInvocations, + @Nonnull TimeCharacteristic timeCharacteristic) { + this.eventEmittingGenerator = eventEmittingGenerator; + this.running = true; + this.emitCallCount = 0; + this.expectedEmitCalls = numberOfGeneratorInvocations; + this.failureAfterNumElements = numberOfGeneratorInvocations / 2; + this.checkpointStatus = new AtomicLong(INITIAL); + this.usingProcessingTime = timeCharacteristic == TimeCharacteristic.ProcessingTime; + } + + @Override + public void open(Configuration parameters) { + // non-parallel source + assertEquals(1, getRuntimeContext().getNumberOfParallelSubtasks()); + } + + @Override + public void run(SourceContext> ctx) throws Exception { + + final RuntimeContext runtimeContext = getRuntimeContext(); + // detect if this task is "the chosen one" and should fail (via subtaskidx), if it did not fail before (via attempt) + final boolean failThisTask = + runtimeContext.getAttemptNumber() == 0 && runtimeContext.getIndexOfThisSubtask() == 0; + + // we loop longer than we have elements, to permit delayed checkpoints + // to still cause a failure + while (running) { + + // the function failed before
[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203978368 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java --- @@ -88,20 +80,17 @@ public void testTumblingTimeWindow() { final int numElementsPerKey = 3000; final int windowSize = 100; final int numKeys = 1; - FailingSource.reset(); try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(100); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 0)); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); env.getConfig().disableSysoutLogging(); env - .addSource(new FailingSource(numKeys, - numElementsPerKey, - numElementsPerKey / 3)) + .addSource(new FailingSource(new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)) --- End diff -- I would start duplicating them if there is divergence and they start to fail :-) ---
[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203978136 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java --- @@ -133,9 +122,12 @@ public void apply( out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun(numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); --- End diff -- Same as above. ---
[GitHub] flink pull request #6376: [FLINK-9902][tests] Improve and refactor window ch...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6376#discussion_r203978068 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java --- @@ -299,12 +288,17 @@ public void apply( sum += value.f1.value; key = value.f0; } - out.collect(new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum))); + + final Tuple4 result = + new Tuple4<>(key, window.getStart(), window.getEnd(), new IntType(sum)); + out.collect(result); } }) - .addSink(new ValidatingSink(numKeys, numElementsPerKey / windowSize)).setParallelism(1); + .addSink(new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1); - tryExecute(env, "Tumbling Window Test"); + env.execute("Tumbling Window Test"); --- End diff -- Because it does not require to use `SuccessExceptions` because in event time the end of the source function is deterministic. ---
[GitHub] flink issue #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6361 Very good work, LGTM ð Merging. ---
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r203382388 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SemanticsCheckMapper.java --- @@ -56,6 +56,7 @@ public void flatMap(Event event, Collector out) throws Exception { if (validator.check(currentValue, nextValue)) { sequenceValue.update(nextValue); } else { + sequenceValue.update(nextValue); --- End diff -- ``` sequenceValue.update(nextValue); if (!validator.check(currentValue, nextValue)) { out.collect("Alert: " + currentValue + " -> " + nextValue + " (" + event.getKey() + ")"); } ``` ---
[GitHub] flink pull request #6351: [FLINK-9862] [test] Extend general puropose DataSt...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r203380952 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java --- @@ -184,12 +189,16 @@ private static final ConfigOption SEQUENCE_GENERATOR_SRC_EVENT_TIME_MAX_OUT_OF_ORDERNESS = ConfigOptions .key("sequence_generator_source.event_time.max_out_of_order") - .defaultValue(500L); + .defaultValue(0L); private static final ConfigOption SEQUENCE_GENERATOR_SRC_EVENT_TIME_CLOCK_PROGRESS = ConfigOptions .key("sequence_generator_source.event_time.clock_progress") .defaultValue(100L); + private static final ConfigOption TUMBLING_WINDOW_OPERATOR_NUM_EVENTS = ConfigOptions + .key("sliding_window_operator.num_events") --- End diff -- `tumbling` instead of `sliding`? ---
[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6361#discussion_r203377098 --- Diff: flink-end-to-end-tests/test-scripts/common.sh --- @@ -240,6 +240,15 @@ function start_cluster { done } +function start_taskmanagers { --- End diff -- I think you could just reuse function `tm_watchdog` for this purpose. ---
[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6361#discussion_r203372004 --- Diff: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/TtlUpdateContext.java --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import javax.annotation.Nonnull; + +import java.io.Serializable; + +/** Contains relevant for state TTL update data. */ +public class TtlUpdateContext implements Serializable { + private final int key; + + @Nonnull + private final String verifierId; + + private final GV valueBeforeUpdate; + private final UV update; + private final GV updatedValue; + + private final long timestamp; + + public TtlUpdateContext(int key, @Nonnull String verifierId, GV valueBeforeUpdate, UV update, GV updatedValue) { + this(key, verifierId, valueBeforeUpdate, update, updatedValue, System.currentTimeMillis()); --- End diff -- This is recording the timestamp with some imprecision from the actual time that the TTL state saw. On the test server, this time difference can be significant and lead to flaky tests, even with your hardcoded imprecision. Instead, we could record the timestamp `t1` before accessing the TTL, and timestamp `t2` after accessing the TTL. All state with ttl < `t1` must clearly be expired, all with ttl > `t2` must clearly be there, and cases that fall between `t1` and `t2` could be ignored because we cannot clearly decide if they saw a timestamp before or after their expiration when accessed. ---
[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6361#discussion_r203363708 --- Diff: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.tests.verify.TtlUpdateContext; + +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_KEYSPACE; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_TIME; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; + +/** + * A test job for State TTL feature. + */ +public class DataStreamStateTTLTestProgram { + private static final ConfigOption STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions --- End diff -- (also for the ones we take from the allround job.) ---
[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6361#discussion_r203362731 --- Diff: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/DataStreamStateTTLTestProgram.java --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests; + +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; +import org.apache.flink.streaming.tests.verify.TtlUpdateContext; + +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_KEYSPACE; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.SEQUENCE_GENERATOR_SRC_SLEEP_TIME; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; + +/** + * A test job for State TTL feature. + */ +public class DataStreamStateTTLTestProgram { + private static final ConfigOption STATE_TTL_VERIFIER_TTL_MILLI = ConfigOptions --- End diff -- Please add documentation for the config parameters to the class-level javadoc ---
[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6361#discussion_r203361200 --- Diff: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; + +abstract class AbstractTtlStateVerifier, S extends State, SV, UV, GV> + implements TtlStateVerifier { + static final Random RANDOM = new Random(); + + @Nonnull + final D stateDesc; + + AbstractTtlStateVerifier(@Nonnull D stateDesc) { + this.stateDesc = stateDesc; + } + + @Nonnull + static String randomString() { + StringBuilder sb = new StringBuilder(); + IntStream.range(0, RANDOM.nextInt(14) + 2).forEach(i -> sb.append(randomChar())); + return sb.toString(); + } + + private static char randomChar() { + char d = (char) ('0' + RANDOM.nextInt(9)); + char l = (char) ('a' + RANDOM.nextInt(25)); + return RANDOM.nextBoolean() ? d : l; + } + + @SuppressWarnings("unchecked") + @Override + @Nonnull + public State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig) { + stateDesc.enableTimeToLive(ttlConfig); + return createState(context); + } + + abstract State createState(FunctionInitializationContext context); + + @SuppressWarnings("unchecked") + @Override + @Nonnull + public TypeSerializer getUpdateSerializer() { + return (TypeSerializer) stateDesc.getSerializer(); + } + + @SuppressWarnings("unchecked") + @Override + public GV get(@Nonnull State state) throws Exception { + return getInternal((S) state); + } + + abstract GV getInternal(@Nonnull S state) throws Exception; + + @SuppressWarnings("unchecked") + @Override + public void update(@Nonnull State state, Object update) throws Exception { + updateInternal((S) state, (UV) update); + } + + abstract void updateInternal(@Nonnull S state, UV update) throws Exception; + + @SuppressWarnings("unchecked") + @Override + public boolean verify(@Nonnull TtlVerificationContext verificationContextRaw, @Nonnull Time precision) { --- End diff -- I think you can get rid of handling wildcards `` and casting by introducing the following method in `TtlVerifyFunction`: ``` private void verify(TtlStateVerifier verifier, TtlVerificationContext verificationContext) { verifier.verify(verificationContext, precision); } ``` and call it ``` verify(TtlStateVerifier.VERIFIERS_BY_NAME.get(value.getVerifierId()), new TtlVerificationContext<>(prevValues, value)); ``` ---
[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6361#discussion_r203355448 --- Diff: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; + +abstract class AbstractTtlStateVerifier, S extends State, SV, UV, GV> + implements TtlStateVerifier { + static final Random RANDOM = new Random(); + + @Nonnull + final D stateDesc; + + AbstractTtlStateVerifier(@Nonnull D stateDesc) { + this.stateDesc = stateDesc; + } + + @Nonnull + static String randomString() { + StringBuilder sb = new StringBuilder(); + IntStream.range(0, RANDOM.nextInt(14) + 2).forEach(i -> sb.append(randomChar())); + return sb.toString(); + } + + private static char randomChar() { + char d = (char) ('0' + RANDOM.nextInt(9)); + char l = (char) ('a' + RANDOM.nextInt(25)); + return RANDOM.nextBoolean() ? d : l; + } + + @SuppressWarnings("unchecked") + @Override + @Nonnull + public State createState(@Nonnull FunctionInitializationContext context, @Nonnull StateTtlConfiguration ttlConfig) { + stateDesc.enableTimeToLive(ttlConfig); + return createState(context); + } + + abstract State createState(FunctionInitializationContext context); + + @SuppressWarnings("unchecked") + @Override + @Nonnull + public TypeSerializer getUpdateSerializer() { + return (TypeSerializer) stateDesc.getSerializer(); + } + + @SuppressWarnings("unchecked") + @Override + public GV get(@Nonnull State state) throws Exception { + return getInternal((S) state); + } + + abstract GV getInternal(@Nonnull S state) throws Exception; + + @SuppressWarnings("unchecked") + @Override + public void update(@Nonnull State state, Object update) throws Exception { + updateInternal((S) state, (UV) update); + } + + abstract void updateInternal(@Nonnull S state, UV update) throws Exception; + + @SuppressWarnings("unchecked") + @Override + public boolean verify(@Nonnull TtlVerificationContext verificationContextRaw, @Nonnull Time precision) { + TtlVerificationContext verificationContext = (TtlVerificationContext) verificationContextRaw; + if (!isWithinPrecision(verificationContext, precision)) { + return true; + } + List> updates = new ArrayList<>(verificationContext.getPrevUpdates()); + long currentTimestamp = verificationContext.getUpdateContext().getTimestamp(); + GV prevValue = expected(updates, currentTimestamp); + GV valueBeforeUpdate = verificationContext.getUpdateContext().getValueBeforeUpdate(); + TtlValue update = verificationContext.getUpdateContext().getUpdateWithTs(); + GV updatedValue = verificationContext.getUpdateContext().getUpdatedValue(); + updates.add(update); + GV expectedValue = expected(updates, currentTimestamp); +
[GitHub] flink pull request #6361: [FLINK-9858] [tests] State TTL End-to-End Test
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6361#discussion_r203353112 --- Diff: flink-end-to-end-tests/flink-stream-state-ttl-test/src/main/java/org/apache/flink/streaming/tests/verify/AbstractTtlStateVerifier.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.verify; + +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.StateTtlConfiguration; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.FunctionInitializationContext; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.stream.IntStream; + +abstract class AbstractTtlStateVerifier, S extends State, SV, UV, GV> + implements TtlStateVerifier { + static final Random RANDOM = new Random(); + + @Nonnull + final D stateDesc; + + AbstractTtlStateVerifier(@Nonnull D stateDesc) { + this.stateDesc = stateDesc; + } + + @Nonnull + static String randomString() { --- End diff -- You could use the existing `StringUtil.getRandomString(...)` instead of your own random string generator. ---
[GitHub] flink issue #6302: [FLINK-9061][checkpointing] add entropy to s3 path for be...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6302 @indrc I think there is no contradiction between Stephan's comment and my suggestion about 1. I stand by my point that this is a very common task and there is no very special requirement in the random string that would really require a new method. My initial suggestions `RandomStringUtils` and Flink's `UUID` are both already available without adding another dependency and a full-test search for `randomstring` in the project also gave some hits like e.g. `StringUtil`, that could also be used or extended in a more general way than putting our own algorithm in this particular place. ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202556109 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/CachingInternalPriorityQueueSet.java --- @@ -80,13 +82,57 @@ public E peek() { @Override public void bulkPoll(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (ENABLE_RELAXED_FIRING_ORDER_OPTIMIZATION) { + bulkPollRelaxedOrder(canConsume, consumer); + } else { + bulkPollStrictOrder(canConsume, consumer); + } + } + + private void bulkPollRelaxedOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } else { + while (!orderedCache.isEmpty() && canConsume.test(orderedCache.peekFirst())) { + final E next = orderedCache.removeFirst(); + orderedStore.remove(next); + consumer.accept(next); + } + + if (orderedCache.isEmpty()) { + bulkPollStore(canConsume, consumer); + } + } + } + + private void bulkPollStrictOrder(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { E element; while ((element = peek()) != null && canConsume.test(element)) { poll(); consumer.accept(element); } } + private void bulkPollStore(@Nonnull Predicate canConsume, @Nonnull Consumer consumer) { + try (CloseableIterator iterator = orderedStore.orderedIterator()) { + while (iterator.hasNext()) { + final E next = iterator.next(); + if (canConsume.test(next)) { + orderedStore.remove(next); + consumer.accept(next); + } else { + orderedCache.add(next); + while (iterator.hasNext() && !orderedCache.isFull()) { + orderedCache.add(iterator.next()); + } + break; + } + } + } catch (Exception e) { + throw new FlinkRuntimeException("Exception while bulk polling store.", e); --- End diff -- Why would you prefer it? I think there is no better way that caller can handle problems in this call than failing the job (rocksdb problems)? ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6333#discussion_r202556091 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -446,8 +485,10 @@ public String toString() { @Override public int numStateEntries() { int sum = 0; - for (StateTable stateTable : stateTables.values()) { - sum += stateTable.size(); + for (StateSnapshotRestore stateTable : registeredStates.values()) { + if (stateTable instanceof StateTable) { + sum += ((StateTable) stateTable).size(); + } --- End diff -- This method is only used for some tests, and to be on the safe side it probably only expected to count the keyed state and not some timers. ---
[GitHub] flink issue #6333: [FLINK-9489] Checkpoint timers as part of managed keyed s...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6333 CC @tillrohrmann ---
[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/6333 [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state ## What is the purpose of the change This PR integrates priority queue state (timers) with the snapshotting of Flink's state backend ans also already includes backwards compatibility (FLINK-9490). Core idea is to have a common abstraction for how state is registered in the state backend and how snapshots operator on such state (`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new state integrates more or less seemless with existing snapshot logic. The notable exception is a current lack of integration of RocksDB state backend with heap-based priority queue state. This can currently still use the old snapshot code without causing any regression using a temporary path (see `AbstractStreamOperator.snapshotState(...)`. As a result, after this PR Flink supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks queue (full and incremental), rocks kv / heap queue (only full) and still uses synchronous snapshots for rocks kv / heap queue (only incremental). This work was created in a bit of a rush to make it into the 1.6 release and still has some known rough edges that we could fix up in the test phase. Here is a list of some things that already come to my mind: - Integrate heap-based timers with incremental RocksDB snapshots, then kick out some code. - Check proper integration with serializer upgrade story (!!) - After that, we can also remove the key-partitioning in the set structure from `HeapPriorityQueueSet`. - Improve integration of the batch wrapper. - Improve general state registration logic in the backends, there is potential to remove duplicated code, and generally still improve the integration of the queue state a bit. - Improve performance of RocksDB based timers, e.g. byte[] based cache, seek sharp to the next potential timer instead of seeking to the key-group start, bulkPoll. - Improve some class/interface/method names - Add tests, e.g. bulkPoll etc. ## Verifying this change This change is already covered by existing tests, but I would add some more eventually. You can activate RocksDB based timers by using the RocksDB backend and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE` to `ROCKS`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (JavaDocs only for now) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6333.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6333 commit 1bb8f70700deacc49a4d4ac7900425c10272959d Author: Stefan Richter Date: 2018-06-13T09:56:16Z [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state commit fc20df8268decab6d9890d617787a4084284b2f0 Author: Stefan Richter Date: 2018-07-13T23:19:30Z Optimization for relaxed bulk polls commit 4db1bea90fd6881555172fe3d22ee928e97894a7 Author: Stefan Richter Date: 2018-07-14T06:34:16Z Renaming of some classes/interfaces ---
[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202295131 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1298,6 +1283,128 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2> stateInfo) throws Exception { + + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfoSnapshot = + (RegisteredKeyedBackendStateMetaInfo.Snapshot) restoredKvStateMetaInfos.get( + stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc); + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer); + + // check compatibility results to determine if state migration is required + TypeSerializerSchemaCompatibility namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(), + namespaceSerializer); + + TypeSerializerSchemaCompatibility stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(), + stateSerializer); + + if (namespaceCompatibility.isIncompatible()) { + throw new UnsupportedOperationException( + "Changing the namespace TypeSerializer in an incompatible way is currently not supported."); + } + + if (stateCompatibility.isIncompatible()) { + if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) { + throw new UnsupportedOperationException( + "Changing the TypeSerializers of a MapState in an incompatible way is currently not supported."); + } + + LOG.info( + "Performing state migration for state {} because the state serializer changed in an incompatible way.", + stateDesc); + + // we need to get an actual state instance because migration is different + // for different state types. For example, ListState needs to deal with + // individual elements + StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); + if (stateFactory == null) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), this.getClass()); + throw new FlinkRuntimeException(message); + } + + State state = stateFactory.createState( + stateDesc, + Tuple2.of(stateInfo.f0, newMetaInfo), + RocksDBKeyedStateBackend.this); + + if (!(state instanceof AbstractRocksDBState)) { + throw new FlinkRuntimeException( + "State should be an AbstractRocksDBState but is " + state); + } + + AbstractRocksDBState rocksDBState = (AbstractRocksDBState) state; + + Snapshot rocksDBSnapshot = null; + RocksIteratorWrapper iterator = null; + + try (ReadOptions readOptions = new ReadOptions();) { + // TODO: can I do this with try-with-resource or do I always have to call +
[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202285840 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerConfigSnapshot.java --- @@ -99,7 +99,8 @@ // TODO this method actually should not have a default implementation; // TODO this placeholder should be removed as soon as all subclasses have a proper implementation in place, and // TODO the method is properly integrated in state backends' restore procedures - throw new UnsupportedOperationException(); +// throw new UnsupportedOperationException(); --- End diff -- Remove this line. ---
[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202293686 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2> stateInfo) throws Exception { + + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfoSnapshot = + (RegisteredKeyedBackendStateMetaInfo.Snapshot) restoredKvStateMetaInfos.get( + stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc); + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer); + + // check compatibility results to determine if state migration is required + TypeSerializerSchemaCompatibility namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(), + namespaceSerializer); + + TypeSerializerSchemaCompatibility stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(), + stateSerializer); + + if (namespaceCompatibility.isIncompatible()) { + throw new UnsupportedOperationException( + "Changing the namespace TypeSerializer in an incompatible way is currently not supported."); + } + + if (stateCompatibility.isIncompatible()) { + if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) { + throw new UnsupportedOperationException( + "Changing the TypeSerializers of a MapState in an incompatible way is currently not supported."); + } + + LOG.info( + "Performing state migration for state {} because the state serializer changed in an incompatible way.", + stateDesc); + + // we need to get an actual state instance because migration is different + // for different state types. For example, ListState needs to deal with + // individual elements + StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); + if (stateFactory == null) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), this.getClass()); + throw new FlinkRuntimeException(message); + } + + State state = stateFactory.createState( + stateDesc, + Tuple2.of(stateInfo.f0, newMetaInfo), + RocksDBKeyedStateBackend.this); + + if (!(state instanceof AbstractRocksDBState)) { + throw new FlinkRuntimeException( + "State should be an AbstractRocksDBState but is " + state); + } + + AbstractRocksDBState rocksDBState = (AbstractRocksDBState) state; --- End diff -- Avoid using raw types for the reference. ---
[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202293217 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2> stateInfo) throws Exception { + + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfoSnapshot = + (RegisteredKeyedBackendStateMetaInfo.Snapshot) restoredKvStateMetaInfos.get( + stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc); + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer); + + // check compatibility results to determine if state migration is required + TypeSerializerSchemaCompatibility namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(), + namespaceSerializer); + + TypeSerializerSchemaCompatibility stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(), + stateSerializer); + + if (namespaceCompatibility.isIncompatible()) { + throw new UnsupportedOperationException( + "Changing the namespace TypeSerializer in an incompatible way is currently not supported."); + } + + if (stateCompatibility.isIncompatible()) { + if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) { + throw new UnsupportedOperationException( + "Changing the TypeSerializers of a MapState in an incompatible way is currently not supported."); + } + + LOG.info( + "Performing state migration for state {} because the state serializer changed in an incompatible way.", + stateDesc); + + // we need to get an actual state instance because migration is different + // for different state types. For example, ListState needs to deal with + // individual elements + StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); + if (stateFactory == null) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), this.getClass()); + throw new FlinkRuntimeException(message); + } + + State state = stateFactory.createState( + stateDesc, + Tuple2.of(stateInfo.f0, newMetaInfo), + RocksDBKeyedStateBackend.this); + + if (!(state instanceof AbstractRocksDBState)) { + throw new FlinkRuntimeException( + "State should be an AbstractRocksDBState but is " + state); + } + + AbstractRocksDBState rocksDBState = (AbstractRocksDBState) state; + + Snapshot rocksDBSnapshot = null; + RocksIteratorWrapper iterator = null; + + try (ReadOptions readOptions = new ReadOptions();) { --- End diff -- I would suggest to try this: ``` Snapshot rocksDBSnapshot = db.getSnapshot();
[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202293477 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2> stateInfo) throws Exception { + + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfoSnapshot = + (RegisteredKeyedBackendStateMetaInfo.Snapshot) restoredKvStateMetaInfos.get( + stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc); + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer); + + // check compatibility results to determine if state migration is required + TypeSerializerSchemaCompatibility namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(), + namespaceSerializer); + + TypeSerializerSchemaCompatibility stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(), + stateSerializer); + + if (namespaceCompatibility.isIncompatible()) { + throw new UnsupportedOperationException( + "Changing the namespace TypeSerializer in an incompatible way is currently not supported."); + } + + if (stateCompatibility.isIncompatible()) { + if (stateDesc.getType().equals(StateDescriptor.Type.MAP)) { + throw new UnsupportedOperationException( + "Changing the TypeSerializers of a MapState in an incompatible way is currently not supported."); + } + + LOG.info( + "Performing state migration for state {} because the state serializer changed in an incompatible way.", + stateDesc); + + // we need to get an actual state instance because migration is different + // for different state types. For example, ListState needs to deal with + // individual elements + StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass()); + if (stateFactory == null) { + String message = String.format("State %s is not supported by %s", + stateDesc.getClass(), this.getClass()); + throw new FlinkRuntimeException(message); + } + + State state = stateFactory.createState( + stateDesc, + Tuple2.of(stateInfo.f0, newMetaInfo), + RocksDBKeyedStateBackend.this); + + if (!(state instanceof AbstractRocksDBState)) { + throw new FlinkRuntimeException( + "State should be an AbstractRocksDBState but is " + state); + } + + AbstractRocksDBState rocksDBState = (AbstractRocksDBState) state; + + Snapshot rocksDBSnapshot = null; + RocksIteratorWrapper iterator = null; + + try (ReadOptions readOptions = new ReadOptions();) { + // TODO: can I do this with try-with-resource or do I always have to call +
[GitHub] flink pull request #6325: [FLINK-9376] Allow upgrading to incompatible state...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6325#discussion_r202289229 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1312,6 +1283,128 @@ private void copyStateDataHandleData( return Tuple2.of(stateInfo.f0, newMetaInfo); } + private RegisteredKeyedBackendStateMetaInfo migrateStateIfNecessary( + StateDescriptor stateDesc, + TypeSerializer namespaceSerializer, + Tuple2> stateInfo) throws Exception { + + @SuppressWarnings("unchecked") + RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfoSnapshot = + (RegisteredKeyedBackendStateMetaInfo.Snapshot) restoredKvStateMetaInfos.get( + stateDesc.getName()); + + Preconditions.checkState( + restoredMetaInfoSnapshot != null, + "Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo," + + " but its corresponding restored snapshot cannot be found."); + + StateUtil.checkStateTypeCompatibility(restoredMetaInfoSnapshot, stateDesc); + + TypeSerializer stateSerializer = stateDesc.getSerializer(); + + RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( + stateDesc.getType(), + stateDesc.getName(), + namespaceSerializer, + stateSerializer); + + // check compatibility results to determine if state migration is required + TypeSerializerSchemaCompatibility namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getNamespaceSerializerConfigSnapshot(), + namespaceSerializer); + + TypeSerializerSchemaCompatibility stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( + restoredMetaInfoSnapshot.getStateSerializerConfigSnapshot(), + stateSerializer); + + if (namespaceCompatibility.isIncompatible()) { + throw new UnsupportedOperationException( + "Changing the namespace TypeSerializer in an incompatible way is currently not supported."); + } + + if (stateCompatibility.isIncompatible()) { --- End diff -- The handling of this branch could maybe go to it's own private method to break down this big monolithic method a bit. ---
[GitHub] flink issue #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6313 LGTM, nice work! ð Besides one comment about closing the backends after tests, the PR is ready. This is no big thing so I will just fix it myself before merging now. ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202130806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.RunnableFuture; + +/** Base class for state backend test context. */ +public abstract class StateBackendTestContext { + private final StateBackend stateBackend; + private final CheckpointStorageLocation checkpointStorageLocation; + private final TtlTimeProvider timeProvider; + + private AbstractKeyedStateBackend keyedStateBackend; + + protected StateBackendTestContext(TtlTimeProvider timeProvider) { + this.timeProvider = Preconditions.checkNotNull(timeProvider); + this.stateBackend = Preconditions.checkNotNull(createStateBackend()); + this.checkpointStorageLocation = createCheckpointStorageLocation(); + } + + protected abstract StateBackend createStateBackend(); + + private CheckpointStorageLocation createCheckpointStorageLocation() { + try { + return stateBackend + .createCheckpointStorage(new JobID()) + .initializeLocationForCheckpoint(2L); + } catch (IOException e) { + throw new RuntimeException("unexpected"); + } + } + + void createAndRestoreKeyedStateBackend() { + Environment env = new DummyEnvironment(); + try { + if (keyedStateBackend != null) { + keyedStateBackend.dispose(); --- End diff -- There is a problem that the backend is only disposed here and not after each test, this leads to some native errors when I run the test. I suggest to give this context a `dispose` method and call it in a `@After` method. ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202103083 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -240,7 +243,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( --- End diff -- No, timers cannot use state descriptor, they cannot extend `State` ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202058484 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -240,7 +243,7 @@ private boolean hasRegisteredState() { } @Override - public IS createState( + public IS createInternalState( --- End diff -- Why are we adding `Internal` here? I would suggest to call the method `create(Internal?)KeyValueState`, because there will also be other state in the future (timers). ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202057089 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -93,7 +94,7 @@ private static final long serialVersionUID = -8191916350224044011L; /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */ - public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; --- End diff -- Ok, it is `PublicEvolving` so you can keep the change. ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202056626 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java --- @@ -93,7 +94,7 @@ private static final long serialVersionUID = -8191916350224044011L; /** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */ - public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; + private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024; --- End diff -- This should not be changed, because the class is user-facing API and someone might have used it. ---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6308#discussion_r202047581 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedBackendSerializationProxy.java --- @@ -98,8 +103,7 @@ public int getVersion() { @Override public int[] getCompatibleVersions() { - // we are compatible with version 3 (Flink 1.3.x) and version 1 & 2 (Flink 1.2.x) - return new int[] {VERSION, 3, 2, 1}; + return new int[]{VERSION, 4, 3, 2, 1}; --- End diff -- Both styles are ok and used in Flink, so I will stick to this. ---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6308#discussion_r202047403 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/SerializationProxiesTest.java --- @@ -126,11 +131,11 @@ public void testKeyedBackendSerializationProxyRoundtripWithSerializerSerializati Assert.assertTrue(serializationProxy.getKeySerializer() instanceof UnloadableDummyTypeSerializer); Assert.assertEquals(keySerializer.snapshotConfiguration(), serializationProxy.getKeySerializerConfigSnapshot()); - for (RegisteredKeyedBackendStateMetaInfo.Snapshot meta : serializationProxy.getStateMetaInfoSnapshots()) { - Assert.assertTrue(meta.getNamespaceSerializer() instanceof UnloadableDummyTypeSerializer); - Assert.assertTrue(meta.getStateSerializer() instanceof UnloadableDummyTypeSerializer); - Assert.assertEquals(namespaceSerializer.snapshotConfiguration(), meta.getNamespaceSerializerConfigSnapshot()); - Assert.assertEquals(stateSerializer.snapshotConfiguration(), meta.getStateSerializerConfigSnapshot()); + for (StateMetaInfoSnapshot meta : serializationProxy.getStateMetaInfoSnapshots()) { --- End diff -- ð Good point ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202042548 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception { @Override public Iterable> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream> entriesStream() throws Exception { + private Iterable entries( + Function, R> resultMapper) throws Exception { Iterable>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static Map.Entry unwrapWithoutTs(Map.Entry> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator implements Iterator { + private final Iterator>> originalIterator; + private final Function, R> resultMapper; + private Map.Entry nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable>> withTs, + @Nonnull Function, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { --- End diff -- I agree, seems like there is no good solution for this. ---
[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6308 @sihuazhou @azagrebin thanks guys for the fast reviews! Will address the comments and merge. ---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6308#discussion_r202041046 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/metainfo/StateMetaInfoSnapshotReadersWriters.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.metainfo; + +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Static factory that gives out the write and readers for different versions of {@link StateMetaInfoSnapshot}. + */ +public class StateMetaInfoSnapshotReadersWriters { + + /** +* Current version for the serialization format of {@link StateMetaInfoSnapshotReadersWriters}. +* - v5: Flink 1.6.x +*/ + public static final int CURRENT_STATE_META_INFO_SNAPSHOT_VERSION = 5; + + /** +* Enum for backeards compatibility. This gives a hint about the expected state type for which a +* {@link StateMetaInfoSnapshot} should be deserialized. +* +* TODO this can go away after we eventually drop backwards compatibility with all versions < 5. +*/ + public enum StateTypeHint { + KEYED_STATE, + OPERATOR_STATE + } + + /** +* Returns the writer for {@link StateMetaInfoSnapshot}. +*/ + @Nonnull + public static StateMetaInfoWriter getWriter() { + return CurrentWriterImpl.INSTANCE; + } + + /** +* Returns a reader for {@link StateMetaInfoSnapshot} with the requested state type and version number. +* +* @param readVersion the format version to read. +* @param stateTypeHint a hint about the expected type to read. +* @return the requested reader. +*/ + @Nonnull + public static StateMetaInfoReader getReader(int readVersion, @Nonnull StateTypeHint stateTypeHint) { + + if (readVersion == CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) { + // latest version shortcut + return CurrentReaderImpl.INSTANCE; + } + + if (readVersion > CURRENT_STATE_META_INFO_SNAPSHOT_VERSION) { + throw new IllegalArgumentException("Unsupported read version for state meta info: " + readVersion); + } + + switch (stateTypeHint) { --- End diff -- ð ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202036711 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -84,52 +86,89 @@ public boolean contains(UK key) throws Exception { @Override public Iterable> entries() throws Exception { - return entriesStream()::iterator; + return entries(e -> e); } - private Stream> entriesStream() throws Exception { + private Iterable entries( + Function, R> resultMapper) throws Exception { Iterable>> withTs = original.entries(); - withTs = withTs == null ? Collections.emptyList() : withTs; - return StreamSupport - .stream(withTs.spliterator(), false) - .filter(this::unexpiredAndUpdateOrCleanup) - .map(TtlMapState::unwrapWithoutTs); - } - - private boolean unexpiredAndUpdateOrCleanup(Map.Entry> e) { - UV unexpiredValue; - try { - unexpiredValue = getWithTtlCheckAndUpdate( - e::getValue, - v -> original.put(e.getKey(), v), - () -> original.remove(e.getKey())); - } catch (Exception ex) { - throw new FlinkRuntimeException(ex); - } - return unexpiredValue != null; - } - - private static Map.Entry unwrapWithoutTs(Map.Entry> e) { - return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue()); + return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper); } @Override public Iterable keys() throws Exception { - return entriesStream().map(Map.Entry::getKey)::iterator; + return entries(Map.Entry::getKey); } @Override public Iterable values() throws Exception { - return entriesStream().map(Map.Entry::getValue)::iterator; + return entries(Map.Entry::getValue); } @Override public Iterator> iterator() throws Exception { - return entriesStream().iterator(); + return entries().iterator(); } @Override public void clear() { original.clear(); } + + private class EntriesIterator implements Iterator { + private final Iterator>> originalIterator; + private final Function, R> resultMapper; + private Map.Entry nextUnexpired = null; + private boolean rightAfterNextIsCalled = false; + + private EntriesIterator( + @Nonnull Iterable>> withTs, + @Nonnull Function, R> resultMapper) { + this.originalIterator = withTs.iterator(); + this.resultMapper = resultMapper; + } + + @Override + public boolean hasNext() { + rightAfterNextIsCalled = false; + while (nextUnexpired == null && originalIterator.hasNext()) { + nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next()); + } + return nextUnexpired != null; + } + + @Override + public R next() { + if (hasNext()) { + rightAfterNextIsCalled = true; + R result = resultMapper.apply(nextUnexpired); + nextUnexpired = null; + return result; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() { + if (rightAfterNextIsCalled) { --- End diff -- I think this a problematic for example in the sequence `hasNext()`, ``next()`, `hasNext()`, `remove()` which is a valid interaction. ---
[GitHub] flink pull request #6313: [FLINK-9701] Add TTL in state descriptors
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6313#discussion_r202032512 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java --- @@ -92,6 +93,10 @@ @Nullable private String queryableStateName; + /** Name for queries against state created from this StateDescriptor. */ + @Nullable + private StateTtlConfiguration ttlConfig; --- End diff -- I would suggest to prefer @Nonnull and a `StateTtlConfiguration` that represents `disabled ttl`. So that the getter will also not return `null` and code can drop `null` checks. ---
[GitHub] flink issue #6277: [FLINK-9511] Implement TTL config
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6277 LGTM ð merging. ---
[GitHub] flink issue #6308: [FLINK-9799] Generalize and unify state meta info snapsho...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6308 CC @azagrebin ---
[GitHub] flink pull request #6308: [FLINK-9799] Generalize and unify state meta info ...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/6308 [FLINK-9799] Generalize and unify state meta info snapshot ## What is the purpose of the change This PR generalizes and unifies the de/serialization of state meta information in backends. We replace the snapshots and reader/writers of the individual state types with a general `StateMetaInfoSnapshot` and the corresponding `StateMetaInfoSnapshotReadersWriters`. Backwards compatibility is maintained. ## Verifying this change This change is already covered by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink FLINK-9799-generalize-state-meta-pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6308.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6308 commit 5e44f759342793f4532e99f8df589c2416402176 Author: Stefan Richter Date: 2018-07-11T09:11:11Z [FLINK-9799][state] Generalize and unify state meta infos ---
[GitHub] flink issue #6276: [FLINK-9486] Introduce TimerState in keyed state backend
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6276 @tillrohrmann Thanks for the fast review. Merging. ---
[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6276#discussion_r201013659 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupedInternalPriorityQueue.java --- @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +import java.util.Set; + +/** + * + */ +@Nonnull +public interface KeyGroupedInternalPriorityQueue extends InternalPriorityQueue { + Set getSubsetForKeyGroup(int keyGroupId); --- End diff -- They are all added, in one of the later commits. ---
[GitHub] flink issue #6275: [FLINK-9776] [runtime] Stop sending periodic interrupts o...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6275 LGTM ð ---
[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6275#discussion_r200911028 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1563,7 +1573,7 @@ public void run() { // log stack trace where the executing thread is stuck and // interrupt the running thread periodically while it is still alive - while (executerThread.isAlive()) { + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) { --- End diff -- Ok, if the intention is improvement and not 100% certainty, then this is perfectly ok. ---
[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6275#discussion_r200737014 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java --- @@ -1563,7 +1573,7 @@ public void run() { // log stack trace where the executing thread is stuck and // interrupt the running thread periodically while it is still alive - while (executerThread.isAlive()) { + while (task.shouldInterruptOnCancel() && executerThread.isAlive()) { --- End diff -- I think that an atomic boolean might be required. This check can pass, then we get interrupted, meanwhile the stream task might already go into the shutdown code and the interrupt might slip through? ---
[GitHub] flink issue #6276: [FLINK-9486] Introduce TimerState in keyed state backend
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6276 CC @tillrohrmann ---
[GitHub] flink pull request #6276: [FLINK-9486] Introduce TimerState in keyed state b...
GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/6276 [FLINK-9486] Introduce TimerState in keyed state backend ## What is the purpose of the change This PR integrates `InternalTimerQueue` with keyed state backends (Heap and RocksDB), so that we can use the RocksDB-based version in the job for the first time. We introduce the interface `KeyGroupPartitionedPriorityQueue` as an easy adapter to existing snapshotting code. This can probably be removed once the queues are fully integrated with the backend's snapshotting, in a followup PR. The PR also addresses an issue with the `TreeOrderedCache` that requires a "proper" `Comparator` (implemented in `TieBreakingPriorityComparator`) and we introduce `PriorityComparator` to give more emphasize to this difference. `TieBreakingPriorityComparator` is likely to also go away once we come up with an improved caching that is not simply based on a tree. We introduce `PriorityQueueSetFactory` to the keyed state backends, and this is were the queues are build. The current implementation of RocksDB uses an additional RocksDB instance until we are fully integrated with backend snapshotting, because we are otherwise running into trouble with incremental snapshots. A configuration parameter is introduced to chose the implementation of queues for RocksDB, the default is still using the heap variant for now. Finally, we introduce an additional method for bulk polling in the `InternalTimerQueue` interface that opens up future optimizations. ## Verifying this change This change is already covered by existing tests, such as `AbstractEventTimeWindowCheckpointingITCase`, but you would currently need to activate it via `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes, if activated) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink integrateSetStateWithBackends Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6276.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6276 commit 0d8743e52a658876425b6cef03fef3fef2d09deb Author: Stefan Richter Date: 2018-07-04T11:43:49Z Remove read options from RocksDBOrderedSetStore commit 84b1b36357322cf23d50396cbfa0725db95797ea Author: Stefan Richter Date: 2018-07-04T11:51:14Z Introduce (temporary?/visible for testing) KeyGroupPartitionedPriorityQueue interface to work with the existing snapshotting commit 35e02705f6740854ae18a92b5a6dfbafd3201a8f Author: Stefan Richter Date: 2018-07-04T16:07:54Z Basic integration with backends / make Rocks timers work commit 1294ac356162430cf9de86980de1d4a0154f33b8 Author: Stefan Richter Date: 2018-07-05T16:46:34Z Introduce PriorityComparator and tie breaking variant as adapter to collections that require a comparator. This is required because the tree set that we use in the cache expects that Comparators are aligned with Object#equals commit bfd3a12e77348a79c91656d80a7a67ece9825103 Author: Stefan Richter Date: 2018-07-05T19:35:08Z Iterator directly from cache if no store-only elements. commit fbf26f1f2efbe1e2029d09d297808e26e08b87d8 Author: Stefan Richter Date: 2018-07-06T08:22:49Z Use a dedicated RocksDB instance for priority queue state. We can revert this once priority queue state is properly integrated with the snapshotting. Until then, we must isolate the priority queue state in a separate db or else incremental checkpoints will break. commit 75cb05ab21e07eaed25e1cac048919f7f313b3f6 Author: Stefan Richter Date: 2018-07-06T13:55:02Z Configuration part commit 7a86e268072ec4ad9d9fae2fa8e852b66d4424a8 Author: Stefan Richter Date: 2018-07-06T14:48:53Z Introduce bulk poll method in queue to open up future optimizations ---
[GitHub] flink issue #6251: [FLINK-9693] Set Execution#taskRestore to null after depl...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6251 LGTM ð ---
[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 I found there is still a small issue with the equals/hashCode but will just fix it before merging. ---
[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 LGTM ð merging. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199894398 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -272,4 +254,60 @@ public int getVersion() { previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class, previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ + protected static class PrecomputedParameters implements Serializable { + /** Whether target type is immutable. */ + final boolean immutableTargetType; + + /** Whether target type and its fields are immutable. */ + final boolean immutable; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + final boolean stateful; + + final int hashCode; --- End diff -- I wonder if this should be `transient` in a serializable class, the hash code could be based on object identity. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199894217 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -272,4 +254,60 @@ public int getVersion() { previousSerializersAndConfigs.get(index).f0, UnloadableDummyTypeSerializer.class, previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]); } + + /** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */ + protected static class PrecomputedParameters implements Serializable { --- End diff -- If this is serializable, we should add a version uid. ---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6228 @sihuazhou @azagrebin thanks for the reviews! I will merge this once my travis is green. ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199841799 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java --- @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Testbase for implementations of {@link InternalPriorityQueue}. + */ +public abstract class InternalPriorityQueueTestBase extends TestLogger { --- End diff -- good point ð ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199817513 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java --- @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Testbase for implementations of {@link InternalPriorityQueue}. + */ +public abstract class InternalPriorityQueueTestBase extends TestLogger { + + protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2); + protected static final KeyExtractorFunction KEY_EXTRACTOR_FUNCTION = TestElement::getKey; + protected static final Comparator TEST_ELEMENT_COMPARATOR = + Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey); + + protected static void insertRandomTimers( + @Nonnull InternalPriorityQueue priorityQueue, + @Nonnull Set checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + final int numUniqueKeys = Math.max(count / 4, 64); + + long duplicatePriority = Long.MIN_VALUE; + + for (int i = 0; i < count; ++i) { + TestElement element; + do { + long elementPriority; + if (duplicatePriority == Long.MIN_VALUE) { + elementPriority = localRandom.nextLong(); + } else { + elementPriority = duplicatePriority; + duplicatePriority = Long.MIN_VALUE; + } + element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority); + } while (!checkSet.add(element)); + + if (localRandom.nextInt(10) == 0) { + duplicatePriority = element.getPriority(); + } + + final boolean headChangedIndicated = priorityQueue.add(element); + if (element.equals(priorityQueue.peek())) { + Assert.assertTrue(headChangedIndicated); + } + } + Assert.assertEquals(count, priorityQueue.size()); + } + + @Test + public void testPeekPollOrder() { + final int initialCapacity = 4; + final int testSize = 1000; + InternalPriorityQueue priorityQueue = + newPriorityQueue(initialCapacity); + HashSet checkSet = new HashSet<>(testSize); + + insertRandomTimers(priorityQueue, checkSet, testSize); + + long lastPriorityValue = Long.MIN_VALUE; + int
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199817285 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java --- @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Testbase for implementations of {@link InternalPriorityQueue}. + */ +public abstract class InternalPriorityQueueTestBase extends TestLogger { + + protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2); + protected static final KeyExtractorFunction KEY_EXTRACTOR_FUNCTION = TestElement::getKey; + protected static final Comparator TEST_ELEMENT_COMPARATOR = + Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey); + + protected static void insertRandomTimers( + @Nonnull InternalPriorityQueue priorityQueue, + @Nonnull Set checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + final int numUniqueKeys = Math.max(count / 4, 64); + + long duplicatePriority = Long.MIN_VALUE; + + for (int i = 0; i < count; ++i) { + TestElement element; + do { + long elementPriority; + if (duplicatePriority == Long.MIN_VALUE) { + elementPriority = localRandom.nextLong(); + } else { + elementPriority = duplicatePriority; + duplicatePriority = Long.MIN_VALUE; + } + element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority); + } while (!checkSet.add(element)); + + if (localRandom.nextInt(10) == 0) { + duplicatePriority = element.getPriority(); + } + + final boolean headChangedIndicated = priorityQueue.add(element); + if (element.equals(priorityQueue.peek())) { + Assert.assertTrue(headChangedIndicated); + } + } + Assert.assertEquals(count, priorityQueue.size()); + } + + @Test + public void testPeekPollOrder() { + final int initialCapacity = 4; + final int testSize = 1000; + InternalPriorityQueue priorityQueue = + newPriorityQueue(initialCapacity); + HashSet checkSet = new HashSet<>(testSize); + + insertRandomTimers(priorityQueue, checkSet, testSize); + + long lastPriorityValue = Long.MIN_VALUE; + int
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199817158 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java --- @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Testbase for implementations of {@link InternalPriorityQueue}. + */ +public abstract class InternalPriorityQueueTestBase extends TestLogger { + + protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2); + protected static final KeyExtractorFunction KEY_EXTRACTOR_FUNCTION = TestElement::getKey; + protected static final Comparator TEST_ELEMENT_COMPARATOR = + Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey); + + protected static void insertRandomTimers( + @Nonnull InternalPriorityQueue priorityQueue, + @Nonnull Set checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + final int numUniqueKeys = Math.max(count / 4, 64); + + long duplicatePriority = Long.MIN_VALUE; + + for (int i = 0; i < count; ++i) { + TestElement element; + do { + long elementPriority; + if (duplicatePriority == Long.MIN_VALUE) { + elementPriority = localRandom.nextLong(); + } else { + elementPriority = duplicatePriority; + duplicatePriority = Long.MIN_VALUE; + } + element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority); + } while (!checkSet.add(element)); + + if (localRandom.nextInt(10) == 0) { + duplicatePriority = element.getPriority(); + } + + final boolean headChangedIndicated = priorityQueue.add(element); + if (element.equals(priorityQueue.peek())) { + Assert.assertTrue(headChangedIndicated); + } + } + Assert.assertEquals(count, priorityQueue.size()); + } + + @Test + public void testPeekPollOrder() { + final int initialCapacity = 4; + final int testSize = 1000; + InternalPriorityQueue priorityQueue = + newPriorityQueue(initialCapacity); + HashSet checkSet = new HashSet<>(testSize); + + insertRandomTimers(priorityQueue, checkSet, testSize); + + long lastPriorityValue = Long.MIN_VALUE; + int
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199816750 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java --- @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Testbase for implementations of {@link InternalPriorityQueue}. + */ +public abstract class InternalPriorityQueueTestBase extends TestLogger { + + protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2); + protected static final KeyExtractorFunction KEY_EXTRACTOR_FUNCTION = TestElement::getKey; + protected static final Comparator TEST_ELEMENT_COMPARATOR = + Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey); + + protected static void insertRandomTimers( + @Nonnull InternalPriorityQueue priorityQueue, + @Nonnull Set checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + final int numUniqueKeys = Math.max(count / 4, 64); + + long duplicatePriority = Long.MIN_VALUE; + + for (int i = 0; i < count; ++i) { + TestElement element; + do { + long elementPriority; + if (duplicatePriority == Long.MIN_VALUE) { + elementPriority = localRandom.nextLong(); + } else { + elementPriority = duplicatePriority; + duplicatePriority = Long.MIN_VALUE; + } + element = new TestElement(localRandom.nextInt(numUniqueKeys), elementPriority); + } while (!checkSet.add(element)); + + if (localRandom.nextInt(10) == 0) { + duplicatePriority = element.getPriority(); + } + + final boolean headChangedIndicated = priorityQueue.add(element); + if (element.equals(priorityQueue.peek())) { + Assert.assertTrue(headChangedIndicated); + } + } + Assert.assertEquals(count, priorityQueue.size()); + } + + @Test + public void testPeekPollOrder() { + final int initialCapacity = 4; + final int testSize = 1000; + InternalPriorityQueue priorityQueue = + newPriorityQueue(initialCapacity); + HashSet checkSet = new HashSet<>(testSize); + + insertRandomTimers(priorityQueue, checkSet, testSize); + + long lastPriorityValue = Long.MIN_VALUE; + int
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199816552 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/InternalPriorityQueueTestBase.java --- @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Testbase for implementations of {@link InternalPriorityQueue}. + */ +public abstract class InternalPriorityQueueTestBase extends TestLogger { + + protected static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 2); + protected static final KeyExtractorFunction KEY_EXTRACTOR_FUNCTION = TestElement::getKey; + protected static final Comparator TEST_ELEMENT_COMPARATOR = + Comparator.comparingLong(TestElement::getPriority).thenComparingLong(TestElement::getKey); + + protected static void insertRandomTimers( --- End diff -- ð ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199816139 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Comparator; + +/** + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set + * semantics. + * + * @param the type of elements in the queue. + * @param type type of sub-queue used for each key-group partition. + */ +public class KeyGroupPartitionedPriorityQueue & HeapPriorityQueueElement> + implements InternalPriorityQueue { + + /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/ + @Nonnull + private final HeapPriorityQueue keyGroupHeap; + + /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */ + @Nonnull + private final PQ[] keyGroupLists; + + /** Function to extract the key from contained elements. */ + @Nonnull + private final KeyExtractorFunction keyExtractor; + + /** The total number of key-groups (in the job). */ + @Nonnegative + private final int totalKeyGroups; + + /** The smallest key-group id with a subpartition managed by this ordered set. */ + @Nonnegative + private final int firstKeyGroup; + + @SuppressWarnings("unchecked") + public KeyGroupPartitionedPriorityQueue( + @Nonnull KeyExtractorFunction keyExtractor, + @Nonnull Comparator elementComparator, + @Nonnull PartitionQueueSetFactory orderedCacheFactory, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalKeyGroups) { + + this.keyExtractor = keyExtractor; + this.totalKeyGroups = totalKeyGroups; + this.firstKeyGroup = keyGroupRange.getStartKeyGroup(); + this.keyGroupLists = (PQ[]) new InternalPriorityQueue[keyGroupRange.getNumberOfKeyGroups()]; + this.keyGroupHeap = new HeapPriorityQueue<>( + new InternalPriorityQueueComparator<>(elementComparator), + keyGroupRange.getNumberOfKeyGroups()); + for (int i = 0; i < keyGroupLists.length; i++) { + final PQ keyGroupCache = + orderedCacheFactory.create(firstKeyGroup + i, totalKeyGroups, elementComparator); + keyGroupLists[i] = keyGroupCache; + keyGroupHeap.add(keyGroupCache); + } + } + + @Nullable + @Override + public T poll() { + final PQ headList = keyGroupHeap.peek(); + final T head = headList.poll(); + keyGroupHeap.adjustModifiedElement(headList); + return head; + } + + @Nullable + @Override + public T peek() { + return keyGroupHeap.peek().peek(); + } + + @Override + public boolean add(@Nonnull T toAdd) { + final PQ list = getListForElementKeyGroup(toAdd); + + // the branch c
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199816118 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedSetStore.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedSetStore implements CachingInternalPriorityQueueSet.OrderedSetStore { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET); --- End diff -- ð ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199815863 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/KeyGroupPartitionedPriorityQueue.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.IOUtils; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Comparator; + +/** + * This implementation of {@link InternalPriorityQueue} is internally partitioned into sub-queues per key-group and + * essentially works as a heap-of-heaps. Instances will have set semantics for elements if the sub-queues have set + * semantics. + * + * @param the type of elements in the queue. + * @param type type of sub-queue used for each key-group partition. + */ +public class KeyGroupPartitionedPriorityQueue & HeapPriorityQueueElement> + implements InternalPriorityQueue { + + /** A heap of heap sets. Each sub-heap represents the partition for a key-group.*/ + @Nonnull + private final HeapPriorityQueue keyGroupHeap; + + /** All elements from keyGroupHeap, indexed by their key-group id, relative to firstKeyGroup. */ + @Nonnull + private final PQ[] keyGroupLists; --- End diff -- ð ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199814790 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueueSet.java --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyExtractorFunction; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue with set semantics, based on {@link HeapPriorityQueue}. The heap is supported by hash + * set for fast contains (de-duplication) and deletes. Object identification happens based on {@link #equals(Object)}. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap and the deduplication set. + * We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc.. + * + * + * @param type of the contained elements. + */ +public class HeapPriorityQueueSet extends HeapPriorityQueue { + + /** +* Function to extract the key from contained elements. +*/ + private final KeyExtractorFunction keyExtractor; + + /** +* This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of elements. +*/ + private final HashMap[] deduplicationMapsByKeyGroup; + + /** +* The key-group range of elements that are managed by this queue. +*/ + private final KeyGroupRange keyGroupRange; + + /** +* The total number of key-groups of the job. +*/ + private final int totalNumberOfKeyGroups; + + /** +* Creates an empty {@link HeapPriorityQueueSet} with the requested initial capacity. +* +* @param elementComparator comparator for the contained elements. +* @param keyExtractor function to extract a key from the contained elements. +* @param minimumCapacity the minimum and initial capacity of this priority queue. +* @param keyGroupRange the key-group range of the elements in this set. +* @param totalNumberOfKeyGroups the total number of key-groups of the job. +*/ + @SuppressWarnings("unchecked") + public HeapPriorityQueueSet( + @Nonnull Comparator elementComparator, + @Nonnull KeyExtractorFunction keyExtractor, + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + super(elementComparator, minimumCapacity); + + this.keyExtractor = keyExtractor; + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; + this.keyGroupRange = keyGroupRange; + + final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups(); + final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange; + this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange]; + for (int i = 0; i < keyGroupsInLocalRange; ++i) { + deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize); + } + } + + @Override + @Nullable + pu
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199814578 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java --- @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE; + +/** + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and + * not on equals. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap. + * + * + * @param type of the contained elements. + */ +public class HeapPriorityQueue implements InternalPriorityQueue { + + /** +* The index of the head element in the array that represents the heap. +*/ + private static final int QUEUE_HEAD_INDEX = 1; + + /** +* Comparator for the contained elements. +*/ + private final Comparator elementComparator; + + /** +* The array that represents the heap-organized priority queue. +*/ + private T[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* Creates an empty {@link HeapPriorityQueue} with the requested initial capacity. +* +* @param elementComparator comparator for the contained elements. +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + public HeapPriorityQueue( + @Nonnull Comparator elementComparator, + @Nonnegative int minimumCapacity) { + + this.elementComparator = elementComparator; + this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity]; + } + + @Override + @Nullable + public T poll() { + return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null; + } + + @Override + @Nullable + public T peek() { + return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null; + } + + /** +* Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}. +* +* @return true if the operation changed the head element or if is it unclear if the head element changed. +* Only returns false iff the head element was not changed by this operation. +*/ + @Override + public boolean add(@Nonnull T toAdd) { + return addInternal(toAdd); + } + + /** +* This remove is based on object identity, not the result of equals. +* +* @return true if the operation changed the head element or if is it unclear if the head element changed. +* Only returns false iff the head element was not changed by this operation. +*/ + @Override + public boolean remove(@Nonnull T toStop) { + return removeInternal(toStop); + } +
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199813559 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java --- @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE; + +/** + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and + * not on equals. + * + * Possible future improvements: + * + * We could also implement shrinking for the heap. + * + * + * @param type of the contained elements. + */ +public class HeapPriorityQueue implements InternalPriorityQueue { + + /** +* The index of the head element in the array that represents the heap. +*/ + private static final int QUEUE_HEAD_INDEX = 1; + + /** +* Comparator for the contained elements. +*/ + private final Comparator elementComparator; + + /** +* The array that represents the heap-organized priority queue. +*/ + private T[] queue; + + /** +* The current size of the priority queue. +*/ + private int size; + + /** +* Creates an empty {@link HeapPriorityQueue} with the requested initial capacity. +* +* @param elementComparator comparator for the contained elements. +* @param minimumCapacity the minimum and initial capacity of this priority queue. +*/ + @SuppressWarnings("unchecked") + public HeapPriorityQueue( + @Nonnull Comparator elementComparator, + @Nonnegative int minimumCapacity) { + + this.elementComparator = elementComparator; + this.queue = (T[]) new HeapPriorityQueueElement[QUEUE_HEAD_INDEX + minimumCapacity]; + } + + @Override + @Nullable + public T poll() { + return size() > 0 ? removeElementAtIndex(QUEUE_HEAD_INDEX) : null; + } + + @Override + @Nullable + public T peek() { + return size() > 0 ? queue[QUEUE_HEAD_INDEX] : null; + } + + /** +* Adds the element to add to the heap. This element should not be managed by any other {@link HeapPriorityQueue}. +* +* @return true if the operation changed the head element or if is it unclear if the head element changed. +* Only returns false iff the head element was not changed by this operation. +*/ + @Override + public boolean add(@Nonnull T toAdd) { + return addInternal(toAdd); + } + + /** +* This remove is based on object identity, not the result of equals. +* +* @return true if the operation changed the head element or if is it unclear if the head element changed. +* Only returns false iff the head element was not changed by this operation. +*/ + @Override + public boolean remove(@Nonnull T toStop) { --- End diff -- ð ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199813529 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapPriorityQueue.java --- @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.apache.flink.util.CollectionUtil.MAX_ARRAY_SIZE; + +/** + * Basic heap-based priority queue for {@link HeapPriorityQueueElement} objects. This heap supports fast deletes + * because it manages position indexes of the contained {@link HeapPriorityQueueElement}s. The heap implementation is + * a simple binary tree stored inside an array. Element indexes in the heap array start at 1 instead of 0 to make array + * index computations a bit simpler in the hot methods. Object identification of remove is based on object identity and --- End diff -- ð ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199812541 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyExtractorFunction.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import javax.annotation.Nonnull; + +/** + * Function to extract a key from a given object. + * + * @param type of the element from which we extract the key. + */ +@FunctionalInterface +public interface KeyExtractorFunction { --- End diff -- I find it useful when concepts have names attached to it and some form of typing otherwise, you end up with a lot of `Function<>` objects and have to think twice about their concrete use-case. ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199812074 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/InternalPriorityQueue.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; + +/** + * Interface for collection that gives in order access to elements w.r.t their priority. + * + * @param type of elements in the ordered set. + */ +@Internal +public interface InternalPriorityQueue { + + /** +* Retrieves and removes the first element (w.r.t. the order) of this set, +* or returns {@code null} if this set is empty. +* +* @return the first element of this ordered set, or {@code null} if this set is empty. +*/ + @Nullable + T poll(); + + /** +* Retrieves, but does not remove, the element (w.r.t. order) of this set, +* or returns {@code null} if this set is empty. +* +* @return the first element (w.r.t. order) of this ordered set, or {@code null} if this set is empty. +*/ + @Nullable + T peek(); + + /** +* Adds the given element to the set, if it is not already contained. +* +* @param toAdd the element to add to the set. +* @return true if the operation changed the head element or if is it unclear if the head element changed. +* Only returns false iff the head element was not changed by this operation. +*/ + boolean add(@Nonnull T toAdd); + + /** +* Removes the given element from the set, if is contained in the set. +* +* @param toRemove the element to remove. +* @return true if the operation changed the head element or if is it unclear if the head element changed. +* Only returns false iff the head element was not changed by this operation. --- End diff -- ð ---
[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 Had a few more comments, but they all are basically optimizations. I leave it up to you if you still want to address all or some of them. Please let me know. Otherwise, we can merge this. ð ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199783587 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + /** Serializers for fields which constitute T. */ + protected final TypeSerializer[] fieldSerializers; + + /** Whether T is an immutable type. */ + final boolean immutableTargetType; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + private final boolean stateful; + + private final int hashCode; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean immutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.immutableTargetType = immutableTargetType && + Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType); + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + this.stateful = isStateful(); + this.hashCode = Arrays.hashCode(fieldSerializers); + } + + private boolean isStateful() { + TypeSerializer[] duplicatedSerializers = duplicateFieldSerializers(); + return IntStream.range(0, fieldSerializers.length) + .anyMatch(i -> fieldSerializers[i] != duplicatedSerializers[i]); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + return stateful ? createSerializerInstance(duplicateFieldSerializers()) : this; --- End diff -- Another small point here for `createSerializerInstance(...)`: we have no (non-public) constructor that can also take all boolean flags, length, and (maybe) hash directly. So if we copy the serializer, I guess it always goes through the whole process again to figure this out, but we could just copy it from the previous instance. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199782860 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + /** Serializers for fields which constitute T. */ + protected final TypeSerializer[] fieldSerializers; + + /** Whether T is an immutable type. */ + final boolean immutableTargetType; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + private final boolean stateful; + + private final int hashCode; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean immutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.immutableTargetType = immutableTargetType && + Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType); + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + this.stateful = isStateful(); + this.hashCode = Arrays.hashCode(fieldSerializers); + } + + private boolean isStateful() { + TypeSerializer[] duplicatedSerializers = duplicateFieldSerializers(); --- End diff -- The flag for `isStateful()` is the only one that I suggested as a candidate for lazy init when `duplicate()` is called for the first time. Reason is that duplicating some types of inner serializers can sometimes be a bit expensive. But again, I feel that this can also be changed in followup work, if needed. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199782303 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.IntStream; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + /** Serializers for fields which constitute T. */ + protected final TypeSerializer[] fieldSerializers; + + /** Whether T is an immutable type. */ + final boolean immutableTargetType; + + /** Byte length of target object in serialized form. */ + private final int length; + + /** Whether any field serializer is stateful. */ + private final boolean stateful; + + private final int hashCode; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean immutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.immutableTargetType = immutableTargetType && + Arrays.stream(fieldSerializers).allMatch(TypeSerializer::isImmutableType); + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + this.stateful = isStateful(); + this.hashCode = Arrays.hashCode(fieldSerializers); --- End diff -- I think up to this point, the code is iterating `fieldSerializers` 5 times (null checks, immutable check, length calc, stateful check, and hash code computation. It could be done in one iteration, but since this method should typically not be called in hot loops, this is an optional improvement. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199485075 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; --- End diff -- I think in Java code style, a boolean field name should not be prefixed with `is...`, only the getter should be prefixed with `is...` ---
[GitHub] flink issue #6196: [FLINK-9513] Implement TTL state wrappers factory and ser...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6196 I had a few more comments, in particular some improvements for the new serializer. I think when those are addressed this is good to merge. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199477483 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +@SuppressWarnings("unchecked") +public class TtlStateFactory { --- End diff -- It might make sense to also have a test when we introduce a new class that provides some kind of "service", e.g. to check that all the types are correctly mapped and also prevent that somebody breaks the mapping by accident. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199476870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java --- @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.state.AggregatingStateDescriptor; +import org.apache.flink.api.common.state.FoldingStateDescriptor; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.CompositeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.KeyedStateFactory; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This state factory wraps state objects, produced by backends, with TTL logic. + */ +@SuppressWarnings("unchecked") --- End diff -- I would not suppress warnings in the scope of a full class, better more fine grained on methods. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474873 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { --- End diff -- We can compute many things like length, immutability, etc already in the constructor. Statelessness is the one thing that we might want to figure out and remember on the first attempt. ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474421 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { + if (!fieldSerializer.isImmutableType()) { + return false; + } + } + return isImmutableTargetType; + } + + @Override + public T createInstance() { + Object[] fields = new Object[fieldSerializers.length]; + for (int index = 0; index < fieldSerializers.length; index++) { + fields[index] = fieldSerializers[index].createInstance(); + } + return createInstance(fields); + } + + @Override + public T copy(T from) { + Preconditions.checkNotNull(from); + Object[] fields = new Object[fieldSerializers.length]; + for (int index = 0; index < fieldSerializers.length; index++)
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474494 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { + if (!fieldSerializer.isImmutableType()) { + return false; + } + } + return isImmutableTargetType; + } + + @Override + public T createInstance() { + Object[] fields = new Object[fieldSerializers.length]; + for (int index = 0; index < fieldSerializers.length; index++) { + fields[index] = fieldSerializers[index].createInstance(); + } + return createInstance(fields); + } + + @Override + public T copy(T from) { + Preconditions.checkNotNull(from); + Object[] fields = new Object[fieldSerializers.length]; + for (int index = 0; index < fieldSerializers.length; index++)
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474258 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { + stateful = true; + } + } + return stateful ? createSerializerInstance(duplicatedSerializers) : this; + } + + @Override + public boolean isImmutableType() { + for (TypeSerializer fieldSerializer : fieldSerializers) { --- End diff -- Why not compute this once in the constructor and remember in a boolean flag?? ---
[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6196#discussion_r199474152 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Base class for composite serializers. + * + * This class serializes a composite type using array of its field serializers. + * Fields are indexed the same way as their serializers. + * + * @param type of custom serialized value + */ +public abstract class CompositeSerializer extends TypeSerializer { + private static final long serialVersionUID = 1L; + + protected final TypeSerializer[] fieldSerializers; + final boolean isImmutableTargetType; + private final int length; + + @SuppressWarnings("unchecked") + protected CompositeSerializer(boolean isImmutableTargetType, TypeSerializer ... fieldSerializers) { + Preconditions.checkNotNull(fieldSerializers); + Preconditions.checkArgument(Arrays.stream(fieldSerializers).allMatch(Objects::nonNull)); + this.isImmutableTargetType = isImmutableTargetType; + this.fieldSerializers = (TypeSerializer[]) fieldSerializers; + this.length = calcLength(); + } + + /** Create new instance from its fields. */ + public abstract T createInstance(@Nonnull Object ... values); + + /** Modify field of existing instance. Supported only by mutable types. */ + protected abstract void setField(@Nonnull T value, int index, Object fieldValue); + + /** Get field of existing instance. */ + protected abstract Object getField(@Nonnull T value, int index); + + /** Factory for concrete serializer. */ + protected abstract CompositeSerializer createSerializerInstance(TypeSerializer ... originalSerializers); + + @Override + public CompositeSerializer duplicate() { + TypeSerializer[] duplicatedSerializers = new TypeSerializer[fieldSerializers.length]; + boolean stateful = false; + for (int index = 0; index < fieldSerializers.length; index++) { + duplicatedSerializers[index] = fieldSerializers[index].duplicate(); + if (fieldSerializers[index] != duplicatedSerializers[index]) { --- End diff -- I wonder if we need to do these checks every time `duplicate()` is called? We could check it once, remember if all field serializer are stateless and from that point return `this` immediately. ---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6228 @sihuazhou thanks for the fast review. I addressed all your comments. ---
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199442186 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedStore implements CachingInternalPriorityQueueSet.OrderedSetStore { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET); + + /** The RocksDB instance that serves as store. */ + @Nonnull + private final RocksDB db; + + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ + @Nonnull + private final ColumnFamilyHandle columnFamilyHandle; + + /** Read options for RocksDB. */ + @Nonnull + private final ReadOptions readOptions; + + /** +* Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be +* aligned with their logical order. +*/ + @Nonnull + private final TypeSerializer byteOrderProducingSerializer; + + /** Wrapper to batch all writes to RocksDB. */ + @Nonnull + private final RocksDBWriteBatchWrapper batchWrapper; + + /** The key-group id of all elements stored in this instance. */ + @Nonnegative + private final int keyGroupId; + + /** The key-group id in serialized form. */ + @Nonnull + private final byte[] groupPrefixBytes; + + /** Output stream that helps to serialize elements. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outputStream; + + /** Output view that helps to serialize elements, must wrap the output stream. */ + @Nonnull + private final DataOutputViewStreamWrapper outputView; + + public RocksDBOrderedStore( + @Nonnegative int keyGroupId, + @Nonnull RocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull ReadOptions readOptions, + @Nonnull TypeSerializer byteOrderProducingSerializer, + @Nonnull ByteArrayOutputStreamWithPos outputStream, + @Nonnull DataOutputViewStreamWrapper outputView, + @Nonnull RocksDBWriteBatchWrapper batchWrapper) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; +
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199427083 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedStore implements CachingInternalPriorityQueueSet.OrderedSetStore { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET); + + /** The RocksDB instance that serves as store. */ + @Nonnull + private final RocksDB db; + + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ + @Nonnull + private final ColumnFamilyHandle columnFamilyHandle; + + /** Read options for RocksDB. */ + @Nonnull + private final ReadOptions readOptions; + + /** +* Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be +* aligned with their logical order. +*/ + @Nonnull + private final TypeSerializer byteOrderProducingSerializer; + + /** Wrapper to batch all writes to RocksDB. */ + @Nonnull + private final RocksDBWriteBatchWrapper batchWrapper; + + /** The key-group id of all elements stored in this instance. */ + @Nonnegative + private final int keyGroupId; + + /** The key-group id in serialized form. */ + @Nonnull + private final byte[] groupPrefixBytes; + + /** Output stream that helps to serialize elements. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outputStream; + + /** Output view that helps to serialize elements, must wrap the output stream. */ + @Nonnull + private final DataOutputViewStreamWrapper outputView; + + public RocksDBOrderedStore( + @Nonnegative int keyGroupId, + @Nonnull RocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull ReadOptions readOptions, + @Nonnull TypeSerializer byteOrderProducingSerializer, + @Nonnull ByteArrayOutputStreamWithPos outputStream, + @Nonnull DataOutputViewStreamWrapper outputView, + @Nonnull RocksDBWriteBatchWrapper batchWrapper) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; +
[GitHub] flink pull request #6228: [FLINK-9491] Implement timer data structure based ...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6228#discussion_r199426052 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOrderedStore.java --- @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet; +import org.apache.flink.util.CloseableIterator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.NoSuchElementException; + +/** + * Implementation of {@link org.apache.flink.runtime.state.heap.CachingInternalPriorityQueueSet.OrderedSetStore} + * based on RocksDB. + * + * IMPORTANT: The store is ordered and the order is determined by the lexicographic order of the byte sequences + * produced by the provided serializer for the elements! + * + * @param the type of stored elements. + */ +public class RocksDBOrderedStore implements CachingInternalPriorityQueueSet.OrderedSetStore { + + /** Serialized empty value to insert into RocksDB. */ + private static final byte[] DUMMY_BYTES = "0".getBytes(ConfigConstants.DEFAULT_CHARSET); + + /** The RocksDB instance that serves as store. */ + @Nonnull + private final RocksDB db; + + /** Handle to the column family of the RocksDB instance in which the elements are stored. */ + @Nonnull + private final ColumnFamilyHandle columnFamilyHandle; + + /** Read options for RocksDB. */ + @Nonnull + private final ReadOptions readOptions; + + /** +* Serializer for the contained elements. The lexicographical order of the bytes of serialized objects must be +* aligned with their logical order. +*/ + @Nonnull + private final TypeSerializer byteOrderProducingSerializer; + + /** Wrapper to batch all writes to RocksDB. */ + @Nonnull + private final RocksDBWriteBatchWrapper batchWrapper; + + /** The key-group id of all elements stored in this instance. */ + @Nonnegative + private final int keyGroupId; + + /** The key-group id in serialized form. */ + @Nonnull + private final byte[] groupPrefixBytes; + + /** Output stream that helps to serialize elements. */ + @Nonnull + private final ByteArrayOutputStreamWithPos outputStream; + + /** Output view that helps to serialize elements, must wrap the output stream. */ + @Nonnull + private final DataOutputViewStreamWrapper outputView; + + public RocksDBOrderedStore( + @Nonnegative int keyGroupId, + @Nonnull RocksDB db, + @Nonnull ColumnFamilyHandle columnFamilyHandle, + @Nonnull ReadOptions readOptions, + @Nonnull TypeSerializer byteOrderProducingSerializer, + @Nonnull ByteArrayOutputStreamWithPos outputStream, + @Nonnull DataOutputViewStreamWrapper outputView, + @Nonnull RocksDBWriteBatchWrapper batchWrapper) { + this.db = db; + this.columnFamilyHandle = columnFamilyHandle; +
[GitHub] flink issue #5799: [FLINK-7775] Remove unreferenced method PermanentBlobCach...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/5799 LGTM ð Merging. ---
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6186 Merging this. ---
[GitHub] flink issue #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrappers
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6186 Had one more minor comment. Besides, this looks good ð Nice job! ---
[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6186#discussion_r199109435 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.AbstractMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * This class wraps map state with TTL logic. + * + * @param The type of key the state is associated to + * @param The type of the namespace + * @param Type of the user entry key of state with TTL + * @param Type of the user entry value of state with TTL + */ +class TtlMapState + extends AbstractTtlState, Map>, InternalMapState>> + implements InternalMapState { + TtlMapState( + InternalMapState> original, + TtlConfig config, + TtlTimeProvider timeProvider, + TypeSerializer> valueSerializer) { + super(original, config, timeProvider, valueSerializer); + } + + @Override + public UV get(UK key) throws Exception { + return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + } + + @Override + public void put(UK key, UV value) throws Exception { + original.put(key, wrapWithTs(value)); + } + + @Override + public void putAll(Map map) throws Exception { + if (map == null) { + return; + } + Map> ttlMap = new HashMap<>(); --- End diff -- We can already initialize the new map with `map.size()`. ---
[GitHub] flink issue #6228: [FLINK-9491] Implement timer data structure based on Rock...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/6228 CC @azagrebin @sihuazhou ---