This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push: new 0dda6fe [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators 0dda6fe is described below commit 0dda6fe9dff4f667b110cda39bfe9738ba615b24 Author: Congxian Qiu <qcx978132...@gmail.com> AuthorDate: Mon May 13 16:29:34 2019 +0800 [FLINK-12296][StateBackend] Fix local state directory collision with state loss for chained keyed operators This closes #8338. --- .../streaming/state/RocksDBKeyedStateBackend.java | 16 +- .../tasks/OneInputStreamTaskTestHarness.java | 43 +++- .../runtime/tasks/StreamConfigChainer.java | 23 +- .../runtime/tasks/StreamMockEnvironment.java | 8 +- .../runtime/tasks/StreamTaskTestHarness.java | 21 +- .../state/StatefulOperatorChainedTaskTest.java | 259 +++++++++++++++++++++ 6 files changed, 357 insertions(+), 13 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 17ba985..62c540f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -270,6 +270,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { /** Shared wrapper for batch writes to the RocksDB instance. */ private RocksDBWriteBatchWrapper writeBatchWrapper; + /** The local directory name of the current snapshot strategy. */ + private final String localDirectoryName; + public RocksDBKeyedStateBackend( String operatorIdentifier, ClassLoader userCodeClassLoader, @@ -319,6 +322,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.localDirectoryName = this.backendUID.toString().replaceAll("[\\-]", ""); this.snapshotStrategy = enableIncrementalCheckpointing ? new IncrementalSnapshotStrategy() : @@ -1977,17 +1981,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { LocalRecoveryDirectoryProvider directoryProvider = localRecoveryConfig.getLocalStateDirectoryProvider(); File directory = directoryProvider.subtaskSpecificCheckpointDirectory(checkpointId); - if (directory.exists()) { - FileUtils.deleteDirectory(directory); - } - - if (!directory.mkdirs()) { + if (!directory.exists() && !directory.mkdirs()) { throw new IOException("Local state base directory for checkpoint " + checkpointId + " already exists: " + directory); } // introduces an extra directory because RocksDB wants a non-existing directory for native checkpoints. - File rdbSnapshotDir = new File(directory, "rocks_db"); + // append localDirectoryName here to solve directory collision problem when two stateful operators chained in one task. + File rdbSnapshotDir = new File(directory, localDirectoryName); + if (rdbSnapshotDir.exists()) { + FileUtils.deleteDirectory(rdbSnapshotDir); + } Path path = new Path(rdbSnapshotDir.toURI()); // create a "permanent" snapshot directory because local recovery is active. snapshotDirectory = SnapshotDirectory.permanent(path); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 89a4f81..0a7efda 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -25,7 +25,10 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; +import java.io.File; import java.io.IOException; import java.util.function.Function; @@ -56,16 +59,48 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes /** * Creates a test harness with the specified number of input gates and specified number - * of channels per input gate. + * of channels per input gate and local recovery disabled. + */ + public OneInputStreamTaskTestHarness( + Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, + int numInputGates, + int numInputChannelsPerGate, + TypeInformation<IN> inputType, + TypeInformation<OUT> outputType) { + this(taskFactory, numInputGates, numInputChannelsPerGate, inputType, outputType, TestLocalRecoveryConfig.disabled()); + } + + public OneInputStreamTaskTestHarness( + Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, + int numInputGates, + int numInputChannelsPerGate, + TypeInformation<IN> inputType, + TypeInformation<OUT> outputType, + File localRootDir) { + super(taskFactory, outputType, localRootDir); + + this.inputType = inputType; + inputSerializer = inputType.createSerializer(executionConfig); + + this.numInputGates = numInputGates; + this.numInputChannelsPerGate = numInputChannelsPerGate; + + streamConfig.setStateKeySerializer(inputSerializer); + } + + /** + * Creates a test harness with the specified number of input gates and specified number + * of channels per input gate and specified localRecoveryConfig. */ public OneInputStreamTaskTestHarness( Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, int numInputGates, int numInputChannelsPerGate, TypeInformation<IN> inputType, - TypeInformation<OUT> outputType) { + TypeInformation<OUT> outputType, + LocalRecoveryConfig localRecoveryConfig) { - super(taskFactory, outputType); + super(taskFactory, outputType, localRecoveryConfig); this.inputType = inputType; inputSerializer = inputType.createSerializer(executionConfig); @@ -82,7 +117,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes TypeInformation<IN> inputType, TypeInformation<OUT> outputType) { - this(taskFactory, 1, 1, inputType, outputType); + this(taskFactory, 1, 1, inputType, outputType, TestLocalRecoveryConfig.disabled()); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index 10e50ce..e19e8d2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -67,11 +67,28 @@ public class StreamConfigChainer { return chain(operatorID, operator, typeSerializer, typeSerializer); } + public <T> StreamConfigChainer chain( + OperatorID operatorID, + OneInputStreamOperator<T, T> operator, + TypeSerializer<T> typeSerializer, + boolean createKeyedStateBackend) { + return chain(operatorID, operator, typeSerializer, typeSerializer, createKeyedStateBackend); + } + + public <IN, OUT> StreamConfigChainer chain( + OperatorID operatorID, + OneInputStreamOperator<IN, OUT> operator, + TypeSerializer<IN> inputSerializer, + TypeSerializer<OUT> outputSerializer) { + return chain(operatorID, operator, inputSerializer, outputSerializer, false); + } + public <IN, OUT> StreamConfigChainer chain( OperatorID operatorID, OneInputStreamOperator<IN, OUT> operator, TypeSerializer<IN> inputSerializer, - TypeSerializer<OUT> outputSerializer) { + TypeSerializer<OUT> outputSerializer, + boolean createKeyedStateBackend) { chainIndex++; tailConfig.setChainedOutputs(Collections.singletonList( @@ -88,6 +105,10 @@ public class StreamConfigChainer { tailConfig.setTypeSerializerIn1(inputSerializer); tailConfig.setTypeSerializerOut(outputSerializer); tailConfig.setChainIndex(chainIndex); + if (createKeyedStateBackend) { + // used to test multiple stateful operators chained in a single task. + tailConfig.setStateKeySerializer(inputSerializer); + } chainedConfigs.put(chainIndex, tailConfig); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index 32de8d5..8f191b9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -102,6 +102,8 @@ public class StreamMockEnvironment implements Environment { private TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class); + private TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo(); + public StreamMockEnvironment( Configuration jobConfig, Configuration taskConfig, @@ -322,7 +324,11 @@ public class StreamMockEnvironment implements Environment { @Override public TaskManagerRuntimeInfo getTaskManagerInfo() { - return new TestingTaskManagerRuntimeInfo(); + return this.taskManagerRuntimeInfo; + } + + public void setTaskManagerInfo(TaskManagerRuntimeInfo taskManagerRuntimeInfo) { + this.taskManagerRuntimeInfo = taskManagerRuntimeInfo; } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index b2f1b99..f46f91b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; @@ -26,10 +27,14 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.LocalRecoveryConfig; +import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl; +import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.collector.selector.OutputSelector; @@ -47,6 +52,7 @@ import org.apache.flink.util.Preconditions; import org.junit.Assert; +import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.LinkedList; @@ -109,7 +115,20 @@ public class StreamTaskTestHarness<OUT> { public StreamTaskTestHarness( Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, TypeInformation<OUT> outputType) { + this(taskFactory, outputType, TestLocalRecoveryConfig.disabled()); + } + public StreamTaskTestHarness( + Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, + TypeInformation<OUT> outputType, + File localRootDir) { + this(taskFactory, outputType, new LocalRecoveryConfig(true, new LocalRecoveryDirectoryProviderImpl(localRootDir, new JobID(), new JobVertexID(), 0))); + } + + public StreamTaskTestHarness( + Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, + TypeInformation<OUT> outputType, + LocalRecoveryConfig localRecoveryConfig) { this.taskFactory = checkNotNull(taskFactory); this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE; this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE; @@ -123,7 +142,7 @@ public class StreamTaskTestHarness<OUT> { outputSerializer = outputType.createSerializer(executionConfig); outputStreamRecordSerializer = new StreamElementSerializer<OUT>(outputSerializer); - this.taskStateManager = new TestTaskStateManager(); + this.taskStateManager = new TestTaskStateManager(localRecoveryConfig); } public ProcessingTimeService getProcessingTimeService() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java new file mode 100644 index 0000000..d36b5e2 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/state/StatefulOperatorChainedTaskTest.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.state; + +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.TestTaskStateManager; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; +import org.apache.flink.streaming.util.TestHarnessUtil; + +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; +import static org.apache.flink.configuration.CheckpointingOptions.INCREMENTAL_CHECKPOINTS; +import static org.apache.flink.configuration.CheckpointingOptions.STATE_BACKEND; +import static org.junit.Assert.assertEquals; + +/** + * Test for StatefulOperatorChainedTaskTest. + */ +public class StatefulOperatorChainedTaskTest { + + private static final Set<OperatorID> RESTORED_OPERATORS = ConcurrentHashMap.newKeySet(); + private TemporaryFolder temporaryFolder; + + @Before + public void setup() throws IOException { + RESTORED_OPERATORS.clear(); + temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + } + + @Test + public void testMultipleStatefulOperatorChainedSnapshotAndRestore() throws Exception { + + OperatorID headOperatorID = new OperatorID(42L, 42L); + OperatorID tailOperatorID = new OperatorID(44L, 44L); + + JobManagerTaskRestore restore = createRunAndCheckpointOperatorChain( + headOperatorID, + new CounterOperator("head"), + tailOperatorID, + new CounterOperator("tail"), + Optional.empty()); + + TaskStateSnapshot stateHandles = restore.getTaskStateSnapshot(); + + assertEquals(2, stateHandles.getSubtaskStateMappings().size()); + + createRunAndCheckpointOperatorChain( + headOperatorID, + new CounterOperator("head"), + tailOperatorID, + new CounterOperator("tail"), + Optional.of(restore)); + + assertEquals(new HashSet<>(Arrays.asList(headOperatorID, tailOperatorID)), RESTORED_OPERATORS); + } + + private JobManagerTaskRestore createRunAndCheckpointOperatorChain( + OperatorID headId, + OneInputStreamOperator<String, String> headOperator, + OperatorID tailId, + OneInputStreamOperator<String, String> tailOperator, + Optional<JobManagerTaskRestore> restore) throws Exception { + + File localRootDir = temporaryFolder.newFolder(); + final OneInputStreamTaskTestHarness<String, String> testHarness = + new OneInputStreamTaskTestHarness<>( + OneInputStreamTask::new, + 1, 1, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + localRootDir); + + testHarness.setupOperatorChain(headId, headOperator) + .chain(tailId, tailOperator, StringSerializer.INSTANCE, true) + .finish(); + + if (restore.isPresent()) { + JobManagerTaskRestore taskRestore = restore.get(); + testHarness.setTaskStateSnapshot( + taskRestore.getRestoreCheckpointId(), + taskRestore.getTaskStateSnapshot()); + } + + StreamMockEnvironment environment = new StreamMockEnvironment( + testHarness.jobConfig, + testHarness.taskConfig, + testHarness.getExecutionConfig(), + testHarness.memorySize, + new MockInputSplitProvider(), + testHarness.bufferSize, + testHarness.getTaskStateManager()); + + Configuration configuration = new Configuration(); + configuration.setString(STATE_BACKEND.key(), "rocksdb"); + File file = temporaryFolder.newFolder(); + configuration.setString(CHECKPOINTS_DIRECTORY.key(), file.toURI().toString()); + configuration.setString(INCREMENTAL_CHECKPOINTS.key(), "true"); + environment.setTaskManagerInfo( + new TestingTaskManagerRuntimeInfo( + configuration, + System.getProperty("java.io.tmpdir").split(",|" + File.pathSeparator))); + testHarness.invoke(environment); + testHarness.waitForTaskRunning(); + + OneInputStreamTask<String, String> streamTask = testHarness.getTask(); + + processRecords(testHarness); + triggerCheckpoint(testHarness, streamTask); + + TestTaskStateManager taskStateManager = testHarness.getTaskStateManager(); + + JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore( + taskStateManager.getReportedCheckpointId(), + taskStateManager.getLastJobManagerTaskStateSnapshot()); + + testHarness.endInput(); + testHarness.waitForTaskCompletion(); + return jobManagerTaskRestore; + } + + private void triggerCheckpoint( + OneInputStreamTaskTestHarness<String, String> testHarness, + OneInputStreamTask<String, String> streamTask) throws Exception { + + long checkpointId = 1L; + CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 1L); + + testHarness.getTaskStateManager().setWaitForReportLatch(new OneShotLatch()); + + while (!streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation())) {} + + testHarness.getTaskStateManager().getWaitForReportLatch().await(); + long reportedCheckpointId = testHarness.getTaskStateManager().getReportedCheckpointId(); + + assertEquals(checkpointId, reportedCheckpointId); + } + + private void processRecords(OneInputStreamTaskTestHarness<String, String> testHarness) throws Exception { + ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); + + testHarness.processElement(new StreamRecord<>("10"), 0, 0); + testHarness.processElement(new StreamRecord<>("20"), 0, 0); + testHarness.processElement(new StreamRecord<>("30"), 0, 0); + + testHarness.waitForInputProcessing(); + + expectedOutput.add(new StreamRecord<>("10")); + expectedOutput.add(new StreamRecord<>("20")); + expectedOutput.add(new StreamRecord<>("30")); + TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput()); + } + + private abstract static class RestoreWatchOperator<IN, OUT> + extends AbstractStreamOperator<OUT> + implements OneInputStreamOperator<IN, OUT> { + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + if (context.isRestored()) { + RESTORED_OPERATORS.add(getOperatorID()); + } + } + } + + /** + * Operator that counts processed messages and keeps result on state. + */ + private static class CounterOperator extends RestoreWatchOperator<String, String> { + private static final long serialVersionUID = 2048954179291813243L; + + private static long snapshotOutData = 0L; + private ValueState<Long> counterState; + private long counter = 0; + private String prefix; + + CounterOperator(String prefix) { + this.prefix = prefix; + } + + @Override + public void processElement(StreamRecord<String> element) throws Exception { + counter++; + output.collect(element); + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + counterState = context + .getKeyedStateStore() + .getState(new ValueStateDescriptor<>(prefix + "counter-state", LongSerializer.INSTANCE)); + + // set key manually to make RocksDBListState get the serialized key. + setCurrentKey("10"); + + if (context.isRestored()) { + counter = counterState.value(); + assertEquals(snapshotOutData, counter); + counterState.clear(); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + counterState.update(counter); + snapshotOutData = counter; + } + } +}