Repository: flink Updated Branches: refs/heads/master a144d0f77 -> e96f28bd3
[FLINK-8735] Add new StatefulJobSavepointMigrationITCase This new test does not pretend to use legacy state but now instead uses the more modern operator state varieties. The binary savepoints for this were generated on the release-1.4 branch. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e96f28bd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e96f28bd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e96f28bd Branch: refs/heads/master Commit: e96f28bd381bbca6c5f7b0572c4527b9f3c7952a Parents: 70a2e7e Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Feb 21 18:10:55 2018 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Feb 22 15:28:20 2018 +0100 ---------------------------------------------------------------------- .../StatefulJobSavepointMigrationITCase.java | 634 +++++++++++++++++++ .../_metadata | Bin 0 -> 44848 bytes .../_metadata | Bin 0 -> 44776 bytes 3 files changed, 634 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e96f28bd/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java new file mode 100644 index 0000000..53a5353 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobSavepointMigrationITCase.java @@ -0,0 +1,634 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing.utils; + +import org.apache.flink.api.common.accumulators.IntCounter; +import org.apache.flink.api.common.functions.RichFlatMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.migration.MigrationVersion; +import org.apache.flink.util.Collector; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.Collection; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Migration ITCases for a stateful job. The tests are parameterized to cover + * migrating for multiple previous Flink versions, as well as for different state backends. + */ +@RunWith(Parameterized.class) +public class StatefulJobSavepointMigrationITCase extends SavepointMigrationTestBase { + + private static final int NUM_SOURCE_ELEMENTS = 4; + + /** + * This test runs in either of two modes: 1) we want to generate the binary savepoint, i.e. + * we have to run the checkpointing functions 2) we want to verify restoring, so we have to run + * the checking functions. + */ + public enum ExecutionMode { + PERFORM_SAVEPOINT, + VERIFY_SAVEPOINT + } + + // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints + private final ExecutionMode executionMode = ExecutionMode.VERIFY_SAVEPOINT; + + @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}") + public static Collection<Tuple2<MigrationVersion, String>> parameters () { + return Arrays.asList( + Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.MEMORY_STATE_BACKEND_NAME), + Tuple2.of(MigrationVersion.v1_4, StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME)); + } + + private final MigrationVersion testMigrateVersion; + private final String testStateBackend; + + public StatefulJobSavepointMigrationITCase(Tuple2<MigrationVersion, String> testMigrateVersionAndBackend) { + this.testMigrateVersion = testMigrateVersionAndBackend.f0; + this.testStateBackend = testMigrateVersionAndBackend.f1; + } + + @Test + public void testSavepoint() throws Exception { + + final int parallelism = 4; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setRestartStrategy(RestartStrategies.noRestart()); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + + switch (testStateBackend) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend())); + break; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + env.setStateBackend(new MemoryStateBackend()); + break; + default: + throw new UnsupportedOperationException(); + } + + env.enableCheckpointing(500); + env.setParallelism(parallelism); + env.setMaxParallelism(parallelism); + + SourceFunction<Tuple2<Long, Long>> nonParallelSource; + SourceFunction<Tuple2<Long, Long>> parallelSource; + RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> flatMap; + OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>> timelyOperator; + + if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) { + nonParallelSource = new CheckpointingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS); + parallelSource = new CheckpointingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS); + flatMap = new CheckpointingKeyedStateFlatMap(); + timelyOperator = new CheckpointingTimelyStatefulOperator(); + } else if (executionMode == ExecutionMode.VERIFY_SAVEPOINT) { + nonParallelSource = new CheckingNonParallelSourceWithListState(NUM_SOURCE_ELEMENTS); + parallelSource = new CheckingParallelSourceWithUnionListState(NUM_SOURCE_ELEMENTS); + flatMap = new CheckingKeyedStateFlatMap(); + timelyOperator = new CheckingTimelyStatefulOperator(); + } else { + throw new IllegalStateException("Unknown ExecutionMode " + executionMode); + } + + env + .addSource(nonParallelSource).uid("CheckpointingSource1") + .keyBy(0) + .flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap1") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + timelyOperator).uid("CheckpointingTimelyStatefulOperator1") + .addSink(new AccumulatorCountingSink<>()); + + env + .addSource(parallelSource).uid("CheckpointingSource2") + .keyBy(0) + .flatMap(flatMap).startNewChain().uid("CheckpointingKeyedStateFlatMap2") + .keyBy(0) + .transform( + "timely_stateful_operator", + new TypeHint<Tuple2<Long, Long>>() {}.getTypeInfo(), + timelyOperator).uid("CheckpointingTimelyStatefulOperator2") + .addSink(new AccumulatorCountingSink<>()); + + if (executionMode == ExecutionMode.PERFORM_SAVEPOINT) { + executeAndSavepoint( + env, + "src/test/resources/" + getSavepointPath(testMigrateVersion, testStateBackend), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2)); + } else { + restoreAndExecute( + env, + getResourceFilename(getSavepointPath(testMigrateVersion, testStateBackend)), + new Tuple2<>(CheckingNonParallelSourceWithListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, 1), + new Tuple2<>(CheckingParallelSourceWithUnionListState.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, parallelism), + new Tuple2<>(CheckingKeyedStateFlatMap.SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(CheckingTimelyStatefulOperator.SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2), + new Tuple2<>(AccumulatorCountingSink.NUM_ELEMENTS_ACCUMULATOR, NUM_SOURCE_ELEMENTS * 2)); + } + } + + private String getSavepointPath(MigrationVersion savepointVersion, String backendType) { + switch (backendType) { + case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME: + return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-rocksdb-savepoint"; + case StateBackendLoader.MEMORY_STATE_BACKEND_NAME: + return "new-stateful-udf-migration-itcase-flink" + savepointVersion + "-savepoint"; + default: + throw new UnsupportedOperationException(); + } + } + + private static class CheckpointingNonParallelSourceWithListState + implements SourceFunction<Tuple2<Long, Long>>, CheckpointedFunction { + + static final ListStateDescriptor<String> STATE_DESCRIPTOR = + new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE); + + static final String CHECKPOINTED_STRING = "Here be dragons!"; + static final String CHECKPOINTED_STRING_1 = "Here be more dragons!"; + static final String CHECKPOINTED_STRING_2 = "Here be yet more dragons!"; + static final String CHECKPOINTED_STRING_3 = "Here be the mostest dragons!"; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + private transient ListState<String> unionListState; + + CheckpointingNonParallelSourceWithListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + unionListState.clear(); + unionListState.add(CHECKPOINTED_STRING); + unionListState.add(CHECKPOINTED_STRING_1); + unionListState.add(CHECKPOINTED_STRING_2); + unionListState.add(CHECKPOINTED_STRING_3); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + unionListState = context.getOperatorStateStore().getListState( + STATE_DESCRIPTOR); + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + ctx.emitWatermark(new Watermark(0)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckingNonParallelSourceWithListState + extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingNonParallelSourceWithListState.class + "_RESTORE_CHECK"; + + private volatile boolean isRunning = true; + + private final int numElements; + + CheckingNonParallelSourceWithListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ListState<String> unionListState = context.getOperatorStateStore().getListState( + CheckpointingNonParallelSourceWithListState.STATE_DESCRIPTOR); + + if (context.isRestored()) { + assertThat(unionListState.get(), + containsInAnyOrder( + CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING, + CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_1, + CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_2, + CheckpointingNonParallelSourceWithListState.CHECKPOINTED_STRING_3)); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } else { + throw new RuntimeException( + "This source should always be restored because it's only used when restoring from a savepoint."); + } + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + // immediately trigger any set timers + ctx.emitWatermark(new Watermark(1000)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + ctx.collect(new Tuple2<>(i, i)); + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckpointingParallelSourceWithUnionListState + extends RichSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction { + + static final ListStateDescriptor<String> STATE_DESCRIPTOR = + new ListStateDescriptor<>("source-state", StringSerializer.INSTANCE); + + static final String[] CHECKPOINTED_STRINGS = { + "Here be dragons!", + "Here be more dragons!", + "Here be yet more dragons!", + "Here be the mostest dragons!" }; + + private static final long serialVersionUID = 1L; + + private volatile boolean isRunning = true; + + private final int numElements; + + private transient ListState<String> unionListState; + + CheckpointingParallelSourceWithUnionListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + unionListState.clear(); + + for (String s : CHECKPOINTED_STRINGS) { + if (s.hashCode() % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) { + unionListState.add(s); + } + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + unionListState = context.getOperatorStateStore().getUnionListState( + STATE_DESCRIPTOR); + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + ctx.emitWatermark(new Watermark(0)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + if (i % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) { + ctx.collect(new Tuple2<>(i, i)); + } + } + } + + // don't emit a final watermark so that we don't trigger the registered event-time + // timers + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckingParallelSourceWithUnionListState + extends RichParallelSourceFunction<Tuple2<Long, Long>> implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingParallelSourceWithUnionListState.class + "_RESTORE_CHECK"; + + private volatile boolean isRunning = true; + + private final int numElements; + + CheckingParallelSourceWithUnionListState(int numElements) { + this.numElements = numElements; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ListState<String> unionListState = context.getOperatorStateStore().getUnionListState( + CheckpointingNonParallelSourceWithListState.STATE_DESCRIPTOR); + + if (context.isRestored()) { + assertThat(unionListState.get(), + containsInAnyOrder(CheckpointingParallelSourceWithUnionListState.CHECKPOINTED_STRINGS)); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } else { + throw new RuntimeException( + "This source should always be restored because it's only used when restoring from a savepoint."); + } + } + + @Override + public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + + // immediately trigger any set timers + ctx.emitWatermark(new Watermark(1000)); + + synchronized (ctx.getCheckpointLock()) { + for (long i = 0; i < numElements; i++) { + if (i % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask()) { + ctx.collect(new Tuple2<>(i, i)); + } + } + } + + while (isRunning) { + Thread.sleep(20); + } + } + + @Override + public void cancel() { + isRunning = false; + } + } + + private static class CheckpointingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE); + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + getRuntimeContext().getState(stateDescriptor).update(value.f1); + } + } + + private static class CheckingKeyedStateFlatMap extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private static final long serialVersionUID = 1L; + + static final String SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR = CheckingKeyedStateFlatMap.class + "_RESTORE_CHECK"; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE); + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception { + out.collect(value); + + ValueState<Long> state = getRuntimeContext().getState(stateDescriptor); + if (state == null) { + throw new RuntimeException("Missing key value state for " + value); + } + + assertEquals(value.f1, state.value()); + getRuntimeContext().getAccumulator(SUCCESSFUL_RESTORE_CHECK_ACCUMULATOR).add(1); + } + } + + private static class CheckpointingTimelyStatefulOperator + extends AbstractStreamOperator<Tuple2<Long, Long>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> { + private static final long serialVersionUID = 1L; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE); + + private transient InternalTimerService<Long> timerService; + + @Override + public void open() throws Exception { + super.open(); + + timerService = getInternalTimerService( + "timer", + LongSerializer.INSTANCE, + this); + + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + element.getValue().f0, + LongSerializer.INSTANCE, + stateDescriptor); + + state.update(element.getValue().f1); + + timerService.registerEventTimeTimer(element.getValue().f0, timerService.currentWatermark() + 10); + timerService.registerProcessingTimeTimer(element.getValue().f0, timerService.currentProcessingTime() + 30_000); + + output.collect(element); + } + + @Override + public void onEventTime(InternalTimer<Long, Long> timer) { + + } + + @Override + public void onProcessingTime(InternalTimer<Long, Long> timer) { + + } + + @Override + public void processWatermark(Watermark mark) { + output.emitWatermark(mark); + } + } + + private static class CheckingTimelyStatefulOperator + extends AbstractStreamOperator<Tuple2<Long, Long>> + implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple2<Long, Long>>, Triggerable<Long, Long> { + private static final long serialVersionUID = 1L; + + static final String SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PROCESS_CHECKS"; + static final String SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_ET_CHECKS"; + static final String SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR = CheckingTimelyStatefulOperator.class + "_PT_CHECKS"; + + private final ValueStateDescriptor<Long> stateDescriptor = + new ValueStateDescriptor<>("state-name", LongSerializer.INSTANCE); + + @Override + public void open() throws Exception { + super.open(); + + // have to re-register to ensure that our onEventTime() is called + getInternalTimerService( + "timer", + LongSerializer.INSTANCE, + this); + + getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().addAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR, new IntCounter()); + getRuntimeContext().addAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR, new IntCounter()); + } + + @Override + public void processElement(StreamRecord<Tuple2<Long, Long>> element) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + element.getValue().f0, + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), element.getValue().f1); + getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESS_CHECK_ACCUMULATOR).add(1); + + output.collect(element); + } + + @Override + public void onEventTime(InternalTimer<Long, Long> timer) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + timer.getNamespace(), + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), timer.getNamespace()); + getRuntimeContext().getAccumulator(SUCCESSFUL_EVENT_TIME_CHECK_ACCUMULATOR).add(1); + } + + @Override + public void onProcessingTime(InternalTimer<Long, Long> timer) throws Exception { + ValueState<Long> state = getKeyedStateBackend().getPartitionedState( + timer.getNamespace(), + LongSerializer.INSTANCE, + stateDescriptor); + + assertEquals(state.value(), timer.getNamespace()); + getRuntimeContext().getAccumulator(SUCCESSFUL_PROCESSING_TIME_CHECK_ACCUMULATOR).add(1); + } + } + + private static class AccumulatorCountingSink<T> extends RichSinkFunction<T> { + private static final long serialVersionUID = 1L; + + static final String NUM_ELEMENTS_ACCUMULATOR = AccumulatorCountingSink.class + "_NUM_ELEMENTS"; + + int count = 0; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + getRuntimeContext().addAccumulator(NUM_ELEMENTS_ACCUMULATOR, new IntCounter()); + } + + @Override + public void invoke(T value, Context context) throws Exception { + count++; + getRuntimeContext().getAccumulator(NUM_ELEMENTS_ACCUMULATOR).add(1); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e96f28bd/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata new file mode 100644 index 0000000..d7c2783 Binary files /dev/null and b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-rocksdb-savepoint/_metadata differ http://git-wip-us.apache.org/repos/asf/flink/blob/e96f28bd/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-savepoint/_metadata ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-savepoint/_metadata b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-savepoint/_metadata new file mode 100644 index 0000000..ddc9bcd Binary files /dev/null and b/flink-tests/src/test/resources/new-stateful-udf-migration-itcase-flink1.4-savepoint/_metadata differ