Repository: flink Updated Branches: refs/heads/master 3d52f52e9 -> 2f7392d77
http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java new file mode 100644 index 0000000..3a39ba0 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -0,0 +1,416 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; +import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; +import org.apache.flink.util.IOUtils; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.CompactionStyle; +import org.rocksdb.DBOptions; + +import java.io.File; + +import static org.hamcrest.CoreMatchers.anyOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.startsWith; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for configuring the RocksDB State Backend. + */ +@SuppressWarnings("serial") +public class RocksDBStateBackendConfigTest { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + // ------------------------------------------------------------------------ + // RocksDB local file directory + // ------------------------------------------------------------------------ + + @Test + public void testDefaultsInSync() throws Exception { + final boolean defaultIncremental = CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); + + RocksDBStateBackend backend = new RocksDBStateBackend(tempFolder.newFolder().toURI()); + assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled()); + } + + @Test + public void testSetDbPath() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + File testDir1 = tempFolder.newFolder(); + File testDir2 = tempFolder.newFolder(); + + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + assertNull(rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath()); + assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePath(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + + rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath()); + assertArrayEquals(new String[] { new Path(testDir1.getAbsolutePath()).toString(), new Path(testDir2.getAbsolutePath()).toString() }, rocksDbBackend.getDbStoragePaths()); + + Environment env = getMockEnvironment(); + RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. + createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry()); + + try { + File instanceBasePath = keyedBackend.getInstanceBasePath(); + assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath()))); + + //noinspection NullArgumentToVariableArgMethod + rocksDbBackend.setDbStoragePaths(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + } finally { + IOUtils.closeQuietly(keyedBackend); + keyedBackend.dispose(); + } + } + + @Test(expected = IllegalArgumentException.class) + public void testSetNullPaths() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + rocksDbBackend.setDbStoragePaths(); + } + + @Test(expected = IllegalArgumentException.class) + public void testNonFileSchemePath() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition"); + } + + // ------------------------------------------------------------------------ + // RocksDB local file automatic from temp directories + // ------------------------------------------------------------------------ + + /** + * This tests whether the RocksDB backends uses the temp directories that are provided + * from the {@link Environment} when no db storage path is set. + */ + @Test + public void testUseTempDirectories() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + File dir1 = tempFolder.newFolder(); + File dir2 = tempFolder.newFolder(); + + File[] tempDirs = new File[] { dir1, dir2 }; + + assertNull(rocksDbBackend.getDbStoragePaths()); + + Environment env = getMockEnvironment(tempDirs); + RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>) rocksDbBackend. + createKeyedStateBackend( + env, + env.getJobID(), + "test_op", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + env.getTaskKvStateRegistry()); + + try { + File instanceBasePath = keyedBackend.getInstanceBasePath(); + assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath()))); + } finally { + IOUtils.closeQuietly(keyedBackend); + keyedBackend.dispose(); + } + } + + // ------------------------------------------------------------------------ + // RocksDB local file directory initialization + // ------------------------------------------------------------------------ + + @Test + public void testFailWhenNoLocalStorageDir() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + File targetDir = tempFolder.newFolder(); + + try { + if (!targetDir.setWritable(false, false)) { + System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable"); + return; + } + + rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); + + boolean hasFailure = false; + try { + Environment env = getMockEnvironment(); + rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "foobar", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + } + catch (Exception e) { + assertTrue(e.getMessage().contains("No local storage directories available")); + assertTrue(e.getMessage().contains(targetDir.getAbsolutePath())); + hasFailure = true; + } + assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure); + } + finally { + //noinspection ResultOfMethodCallIgnored + targetDir.setWritable(true, false); + } + } + + @Test + public void testContinueOnSomeDbDirectoriesMissing() throws Exception { + File targetDir1 = tempFolder.newFolder(); + File targetDir2 = tempFolder.newFolder(); + + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + try { + + if (!targetDir1.setWritable(false, false)) { + System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable"); + return; + } + + rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath()); + + try { + Environment env = getMockEnvironment(); + AbstractKeyedStateBackend<Integer> keyedStateBackend = rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "foobar", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + + IOUtils.closeQuietly(keyedStateBackend); + keyedStateBackend.dispose(); + } + catch (Exception e) { + e.printStackTrace(); + fail("Backend initialization failed even though some paths were available"); + } + } finally { + //noinspection ResultOfMethodCallIgnored + targetDir1.setWritable(true, false); + } + } + + // ------------------------------------------------------------------------ + // RocksDB Options + // ------------------------------------------------------------------------ + + @Test + public void testPredefinedOptions() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); + + rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); + assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); + + try (ColumnFamilyOptions colCreated = rocksDbBackend.getColumnOptions()) { + assertEquals(CompactionStyle.LEVEL, colCreated.compactionStyle()); + } + } + + @Test + public void testOptionsFactory() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + rocksDbBackend.setOptions(new OptionsFactory() { + @Override + public DBOptions createDBOptions(DBOptions currentOptions) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { + return currentOptions.setCompactionStyle(CompactionStyle.FIFO); + } + }); + + assertNotNull(rocksDbBackend.getOptions()); + assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle()); + } + + @Test + public void testPredefinedAndOptionsFactory() throws Exception { + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath); + + assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions()); + + rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); + rocksDbBackend.setOptions(new OptionsFactory() { + @Override + public DBOptions createDBOptions(DBOptions currentOptions) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) { + return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL); + } + }); + + assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions()); + assertNotNull(rocksDbBackend.getOptions()); + assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle()); + } + + @Test + public void testPredefinedOptionsEnum() { + for (PredefinedOptions o : PredefinedOptions.values()) { + try (DBOptions opt = o.createDBOptions()) { + assertNotNull(opt); + } + } + } + + // ------------------------------------------------------------------------ + // Reconfiguration + // ------------------------------------------------------------------------ + + @Test + public void testRocksDbReconfigurationCopiesExistingValues() throws Exception { + final FsStateBackend checkpointBackend = new FsStateBackend(tempFolder.newFolder().toURI().toString()); + final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); + + final RocksDBStateBackend original = new RocksDBStateBackend(checkpointBackend, incremental); + + // these must not be the default options + final PredefinedOptions predOptions = PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM; + assertNotEquals(predOptions, original.getPredefinedOptions()); + original.setPredefinedOptions(predOptions); + + final OptionsFactory optionsFactory = mock(OptionsFactory.class); + original.setOptions(optionsFactory); + + final String[] localDirs = new String[] { + tempFolder.newFolder().getAbsolutePath(), tempFolder.newFolder().getAbsolutePath() }; + original.setDbStoragePaths(localDirs); + + RocksDBStateBackend copy = original.configure(new Configuration()); + + assertEquals(original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled()); + assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths()); + assertEquals(original.getOptions(), copy.getOptions()); + assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions()); + + FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend(); + assertEquals(checkpointBackend.getCheckpointPath(), copyCheckpointBackend.getCheckpointPath()); + assertEquals(checkpointBackend.getSavepointPath(), copyCheckpointBackend.getSavepointPath()); + } + + // ------------------------------------------------------------------------ + // Contained Non-partitioned State Backend + // ------------------------------------------------------------------------ + + @Test + public void testCallsForwardedToNonPartitionedBackend() throws Exception { + AbstractStateBackend storageBackend = new MemoryStateBackend(); + RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend); + assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend()); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + static Environment getMockEnvironment() { + return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); + } + + static Environment getMockEnvironment(File[] tempDirs) { + final String[] tempDirStrings = new String[tempDirs.length]; + for (int i = 0; i < tempDirs.length; i++) { + tempDirStrings[i] = tempDirs[i].getAbsolutePath(); + } + + IOManager ioMan = mock(IOManager.class); + when(ioMan.getSpillingDirectories()).thenReturn(tempDirs); + + Environment env = mock(Environment.class); + when(env.getJobID()).thenReturn(new JobID()); + when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader()); + when(env.getIOManager()).thenReturn(ioMan); + when(env.getTaskKvStateRegistry()).thenReturn(new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID())); + + TaskInfo taskInfo = mock(TaskInfo.class); + when(env.getTaskInfo()).thenReturn(taskInfo); + when(taskInfo.getIndexOfThisSubtask()).thenReturn(0); + + TaskManagerRuntimeInfo tmInfo = new TestingTaskManagerRuntimeInfo(new Configuration(), tempDirStrings); + when(env.getTaskManagerInfo()).thenReturn(tmInfo); + + return env; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java new file mode 100644 index 0000000..7612c4c --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Arrays; +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the RocksDBStateBackendFactory. + */ +public class RocksDBStateBackendFactoryTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + private final ClassLoader cl = getClass().getClassLoader(); + + private final String backendKey = CheckpointingOptions.STATE_BACKEND.key(); + + // ------------------------------------------------------------------------ + + @Test + public void testFactoryName() { + // construct the name such that it will not be automatically adjusted on refactorings + String factoryName = "org.apache.flink.contrib.streaming.state.Roc"; + factoryName += "ksDBStateBackendFactory"; + + // !!! if this fails, the code in StateBackendLoader must be adjusted + assertEquals(factoryName, RocksDBStateBackendFactory.class.getName()); + } + + /** + * Validates loading a file system state backend with additional parameters from the cluster configuration. + */ + @Test + public void testLoadFileSystemStateBackend() throws Exception { + final String checkpointDir = new Path(tmp.newFolder().toURI()).toString(); + final String savepointDir = new Path(tmp.newFolder().toURI()).toString(); + final String localDir1 = tmp.newFolder().getAbsolutePath(); + final String localDir2 = tmp.newFolder().getAbsolutePath(); + final String localDirs = localDir1 + File.pathSeparator + localDir2; + final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); + + final Path expectedCheckpointsPath = new Path(checkpointDir); + final Path expectedSavepointsPath = new Path(savepointDir); + + // we configure with the explicit string (rather than AbstractStateBackend#X_STATE_BACKEND_NAME) + // to guard against config-breaking changes of the name + final Configuration config1 = new Configuration(); + config1.setString(backendKey, "rocksdb"); + config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); + config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs); + config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental); + + final Configuration config2 = new Configuration(); + config2.setString(backendKey, RocksDBStateBackendFactory.class.getName()); + config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); + config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs); + config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental); + + StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null); + StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null); + + assertTrue(backend1 instanceof RocksDBStateBackend); + assertTrue(backend2 instanceof RocksDBStateBackend); + + RocksDBStateBackend fs1 = (RocksDBStateBackend) backend1; + RocksDBStateBackend fs2 = (RocksDBStateBackend) backend2; + + AbstractFileStateBackend fs1back = (AbstractFileStateBackend) fs1.getCheckpointBackend(); + AbstractFileStateBackend fs2back = (AbstractFileStateBackend) fs2.getCheckpointBackend(); + + assertEquals(expectedCheckpointsPath, fs1back.getCheckpointPath()); + assertEquals(expectedCheckpointsPath, fs2back.getCheckpointPath()); + assertEquals(expectedSavepointsPath, fs1back.getSavepointPath()); + assertEquals(expectedSavepointsPath, fs2back.getSavepointPath()); + assertEquals(incremental, fs1.isIncrementalCheckpointsEnabled()); + assertEquals(incremental, fs2.isIncrementalCheckpointsEnabled()); + checkPaths(fs1.getDbStoragePaths(), localDir1, localDir2); + checkPaths(fs2.getDbStoragePaths(), localDir1, localDir2); + } + + /** + * Validates taking the application-defined file system state backend and adding with additional + * parameters from the cluster configuration, but giving precedence to application-defined + * parameters over configuration-defined parameters. + */ + @Test + public void testLoadFileSystemStateBackendMixed() throws Exception { + final String appCheckpointDir = new Path(tmp.newFolder().toURI()).toString(); + final String checkpointDir = new Path(tmp.newFolder().toURI()).toString(); + final String savepointDir = new Path(tmp.newFolder().toURI()).toString(); + + final String localDir1 = tmp.newFolder().getAbsolutePath(); + final String localDir2 = tmp.newFolder().getAbsolutePath(); + final String localDir3 = tmp.newFolder().getAbsolutePath(); + final String localDir4 = tmp.newFolder().getAbsolutePath(); + + final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); + + final Path expectedCheckpointsPath = new Path(appCheckpointDir); + final Path expectedSavepointsPath = new Path(savepointDir); + + final RocksDBStateBackend backend = new RocksDBStateBackend(appCheckpointDir, incremental); + backend.setDbStoragePaths(localDir1, localDir2); + + final Configuration config = new Configuration(); + config.setString(backendKey, "jobmanager"); // this should not be picked up + config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up + config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir); + config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, !incremental); // this should not be picked up + config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + ":" + localDir4); // this should not be picked up + + final StateBackend loadedBackend = + StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null); + assertTrue(loadedBackend instanceof RocksDBStateBackend); + + final RocksDBStateBackend loadedRocks = (RocksDBStateBackend) loadedBackend; + + assertEquals(incremental, loadedRocks.isIncrementalCheckpointsEnabled()); + checkPaths(loadedRocks.getDbStoragePaths(), localDir1, localDir2); + + AbstractFileStateBackend fsBackend = (AbstractFileStateBackend) loadedRocks.getCheckpointBackend(); + assertEquals(expectedCheckpointsPath, fsBackend.getCheckpointPath()); + assertEquals(expectedSavepointsPath, fsBackend.getSavepointPath()); + } + + // ------------------------------------------------------------------------ + + private static void checkPaths(String[] pathsArray, String... paths) { + assertNotNull(pathsArray); + assertNotNull(paths); + + assertEquals(pathsArray.length, paths.length); + + HashSet<String> pathsSet = new HashSet<>(Arrays.asList(pathsArray)); + + for (String path : paths) { + assertTrue(pathsSet.contains(path)); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java new file mode 100644 index 0000000..54af400 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -0,0 +1,526 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateBackendTestBase; +import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.filefilter.IOFileFilter; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import org.rocksdb.RocksObject; +import org.rocksdb.Snapshot; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.RunnableFuture; + +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.spy; + +/** + * Tests for the partitioned state part of {@link RocksDBStateBackend}. + */ +@RunWith(Parameterized.class) +public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> { + + private OneShotLatch blocker; + private OneShotLatch waiter; + private BlockerCheckpointStreamFactory testStreamFactory; + private RocksDBKeyedStateBackend<Integer> keyedStateBackend; + private List<RocksObject> allCreatedCloseables; + private ValueState<Integer> testState1; + private ValueState<String> testState2; + + @Parameterized.Parameters(name = "Incremental checkpointing: {0}") + public static Collection<Boolean> parameters() { + return Arrays.asList(false, true); + } + + @Parameterized.Parameter + public boolean enableIncrementalCheckpointing; + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + // Store it because we need it for the cleanup test. + private String dbPath; + + @Override + protected RocksDBStateBackend getStateBackend() throws IOException { + dbPath = tempFolder.newFolder().getAbsolutePath(); + String checkpointPath = tempFolder.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), enableIncrementalCheckpointing); + backend.setDbStoragePath(dbPath); + return backend; + } + + // small safety net for instance cleanups, so that no native objects are left + @After + public void cleanupRocksDB() { + if (keyedStateBackend != null) { + IOUtils.closeQuietly(keyedStateBackend); + keyedStateBackend.dispose(); + } + + if (allCreatedCloseables != null) { + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + allCreatedCloseables = null; + } + } + + public void setupRocksKeyedStateBackend() throws Exception { + + blocker = new OneShotLatch(); + waiter = new OneShotLatch(); + testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 1024); + testStreamFactory.setBlockerLatch(blocker); + testStreamFactory.setWaiterLatch(waiter); + testStreamFactory.setAfterNumberInvocations(10); + + RocksDBStateBackend backend = getStateBackend(); + Environment env = new DummyEnvironment("TestTask", 1, 0); + + keyedStateBackend = (RocksDBKeyedStateBackend<Integer>) backend.createKeyedStateBackend( + env, + new JobID(), + "Test", + IntSerializer.INSTANCE, + 2, + new KeyGroupRange(0, 1), + mock(TaskKvStateRegistry.class)); + + keyedStateBackend.restore(null); + + testState1 = keyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("TestState-1", Integer.class, 0)); + + testState2 = keyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ValueStateDescriptor<>("TestState-2", String.class, "")); + + allCreatedCloseables = new ArrayList<>(); + + keyedStateBackend.db = spy(keyedStateBackend.db); + + doAnswer(new Answer<Object>() { + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + RocksIterator rocksIterator = spy((RocksIterator) invocationOnMock.callRealMethod()); + allCreatedCloseables.add(rocksIterator); + return rocksIterator; + } + }).when(keyedStateBackend.db).newIterator(any(ColumnFamilyHandle.class), any(ReadOptions.class)); + + doAnswer(new Answer<Object>() { + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + Snapshot snapshot = spy((Snapshot) invocationOnMock.callRealMethod()); + allCreatedCloseables.add(snapshot); + return snapshot; + } + }).when(keyedStateBackend.db).getSnapshot(); + + doAnswer(new Answer<Object>() { + + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + ColumnFamilyHandle snapshot = spy((ColumnFamilyHandle) invocationOnMock.callRealMethod()); + allCreatedCloseables.add(snapshot); + return snapshot; + } + }).when(keyedStateBackend.db).createColumnFamily(any(ColumnFamilyDescriptor.class)); + + for (int i = 0; i < 100; ++i) { + keyedStateBackend.setCurrentKey(i); + testState1.update(4200 + i); + testState2.update("S-" + (4200 + i)); + } + } + + @Test + public void testCorrectMergeOperatorSet() throws IOException { + + final ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions()); + RocksDBKeyedStateBackend<Integer> test = null; + try { + test = new RocksDBKeyedStateBackend<>( + "test", + Thread.currentThread().getContextClassLoader(), + tempFolder.newFolder(), + mock(DBOptions.class), + columnFamilyOptions, + mock(TaskKvStateRegistry.class), + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new ExecutionConfig(), + enableIncrementalCheckpointing); + + verify(columnFamilyOptions, Mockito.times(1)) + .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); + } finally { + if (test != null) { + IOUtils.closeQuietly(test); + test.dispose(); + } + columnFamilyOptions.close(); + } + } + + @Test + public void testReleasingSnapshotAfterBackendClosed() throws Exception { + setupRocksKeyedStateBackend(); + + try { + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + + RocksDB spyDB = keyedStateBackend.db; + + if (!enableIncrementalCheckpointing) { + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class)); + } + + //Ensure every RocksObjects not closed yet + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(0)).close(); + } + + snapshot.cancel(true); + + this.keyedStateBackend.dispose(); + + verify(spyDB, times(1)).close(); + assertEquals(null, keyedStateBackend.db); + + //Ensure every RocksObjects was closed exactly once + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + + } finally { + keyedStateBackend.dispose(); + keyedStateBackend = null; + } + } + + @Test + public void testDismissingSnapshot() throws Exception { + setupRocksKeyedStateBackend(); + try { + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + snapshot.cancel(true); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; + } + } + + @Test + public void testDismissingSnapshotNotRunnable() throws Exception { + setupRocksKeyedStateBackend(); + try { + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + snapshot.cancel(true); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + try { + snapshot.get(); + fail(); + } catch (Exception ignored) { + + } + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; + } + } + + @Test + public void testCompletingSnapshot() throws Exception { + setupRocksKeyedStateBackend(); + try { + RunnableFuture<KeyedStateHandle> snapshot = + keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + waiter.await(); // wait for snapshot to run + waiter.reset(); + runStateUpdates(); + blocker.trigger(); // allow checkpointing to start writing + waiter.await(); // wait for snapshot stream writing to run + KeyedStateHandle keyedStateHandle = snapshot.get(); + assertNotNull(keyedStateHandle); + assertTrue(keyedStateHandle.getStateSize() > 0); + assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); + assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; + } + } + + @Test + public void testCancelRunningSnapshot() throws Exception { + setupRocksKeyedStateBackend(); + try { + RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + Thread asyncSnapshotThread = new Thread(snapshot); + asyncSnapshotThread.start(); + waiter.await(); // wait for snapshot to run + waiter.reset(); + runStateUpdates(); + snapshot.cancel(true); + blocker.trigger(); // allow checkpointing to start writing + assertTrue(testStreamFactory.getLastCreatedStream().isClosed()); + waiter.await(); // wait for snapshot stream writing to run + try { + snapshot.get(); + fail(); + } catch (Exception ignored) { + } + + asyncSnapshotThread.join(); + verifyRocksObjectsReleased(); + } finally { + this.keyedStateBackend.dispose(); + this.keyedStateBackend = null; + } + } + + @Test + public void testDisposeDeletesAllDirectories() throws Exception { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + Collection<File> allFilesInDbDir = + FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter()); + try { + ValueStateDescriptor<String> kvId = + new ValueStateDescriptor<>("id", String.class, null); + + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState<String> state = + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + backend.setCurrentKey(1); + state.update("Hello"); + + // more than just the root directory + assertTrue(allFilesInDbDir.size() > 1); + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + allFilesInDbDir = + FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter()); + + // just the root directory left + assertEquals(1, allFilesInDbDir.size()); + } + + @Test + public void testSharedIncrementalStateDeRegistration() throws Exception { + if (enableIncrementalCheckpointing) { + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE); + try { + ValueStateDescriptor<String> kvId = + new ValueStateDescriptor<>("id", String.class, null); + + kvId.initializeSerializerUnlessSet(new ExecutionConfig()); + + ValueState<String> state = + backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>(); + SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); + for (int checkpointId = 0; checkpointId < 3; ++checkpointId) { + + reset(sharedStateRegistry); + + backend.setCurrentKey(checkpointId); + state.update("Hello-" + checkpointId); + + RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot( + checkpointId, + checkpointId, + createStreamFactory(), + CheckpointOptions.forCheckpointWithDefaultLocation()); + + snapshot.run(); + + IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get(); + Map<StateHandleID, StreamStateHandle> sharedState = + new HashMap<>(stateHandle.getSharedState()); + + stateHandle.registerSharedStates(sharedStateRegistry); + + for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) { + verify(sharedStateRegistry).registerReference( + stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()), + e.getValue()); + } + + previousStateHandles.add(stateHandle); + backend.notifyCheckpointComplete(checkpointId); + + //----------------------------------------------------------------- + + if (previousStateHandles.size() > 1) { + checkRemove(previousStateHandles.remove(), sharedStateRegistry); + } + } + + while (!previousStateHandles.isEmpty()) { + + reset(sharedStateRegistry); + + checkRemove(previousStateHandles.remove(), sharedStateRegistry); + } + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + } + } + + private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception { + for (StateHandleID id : remove.getSharedState().keySet()) { + verify(registry, times(0)).unregisterReference( + remove.createSharedStateRegistryKeyFromFileName(id)); + } + + remove.discardState(); + + for (StateHandleID id : remove.getSharedState().keySet()) { + verify(registry).unregisterReference( + remove.createSharedStateRegistryKeyFromFileName(id)); + } + } + + private void runStateUpdates() throws Exception{ + for (int i = 50; i < 150; ++i) { + if (i % 10 == 0) { + Thread.sleep(1); + } + keyedStateBackend.setCurrentKey(i); + testState1.update(4200 + i); + testState2.update("S-" + (4200 + i)); + } + } + + private void verifyRocksObjectsReleased() { + //Ensure every RocksObject was closed exactly once + for (RocksObject rocksCloseable : allCreatedCloseables) { + verify(rocksCloseable, times(1)).close(); + } + + assertNotNull(null, keyedStateBackend.db); + RocksDB spyDB = keyedStateBackend.db; + + if (!enableIncrementalCheckpointing) { + verify(spyDB, times(1)).getSnapshot(); + verify(spyDB, times(1)).releaseSnapshot(any(Snapshot.class)); + } + + keyedStateBackend.dispose(); + verify(spyDB, times(1)).close(); + assertEquals(null, keyedStateBackend.db); + } + + private static class AcceptAllFilter implements IOFileFilter { + @Override + public boolean accept(File file) { + return true; + } + + @Override + public boolean accept(File file, String s) { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java new file mode 100644 index 0000000..72c85ec --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.RocksDB; + +import java.lang.reflect.Method; +import java.net.URL; + +import static org.junit.Assert.assertNotEquals; + +/** + * This test validates that the RocksDB JNI library loading works properly + * in the presence of the RocksDB code being loaded dynamically via reflection. + * That can happen when RocksDB is in the user code JAR, or in certain test setups. + */ +public class RocksDbMultiClassLoaderTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testTwoSeparateClassLoaders() throws Exception { + // collect the libraries / class folders with RocksDB related code: the state backend and RocksDB itself + final URL codePath1 = RocksDBStateBackend.class.getProtectionDomain().getCodeSource().getLocation(); + final URL codePath2 = RocksDB.class.getProtectionDomain().getCodeSource().getLocation(); + + final ClassLoader parent = getClass().getClassLoader(); + final ClassLoader loader1 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent, new String[0]); + final ClassLoader loader2 = FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, parent, new String[0]); + + final String className = RocksDBStateBackend.class.getName(); + + final Class<?> clazz1 = Class.forName(className, false, loader1); + final Class<?> clazz2 = Class.forName(className, false, loader2); + assertNotEquals("Test broken - the two reflectively loaded classes are equal", clazz1, clazz2); + + final Object instance1 = clazz1.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString()); + final Object instance2 = clazz2.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString()); + + final String tempDir = tmp.newFolder().getAbsolutePath(); + + final Method meth1 = clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class); + final Method meth2 = clazz2.getDeclaredMethod("ensureRocksDBIsLoaded", String.class); + meth1.setAccessible(true); + meth2.setAccessible(true); + + // if all is well, these methods can both complete successfully + meth1.invoke(instance1, tempDir); + meth2.invoke(instance2, tempDir); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java new file mode 100644 index 0000000..670c355 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.benchmark; + +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.testutils.junit.RetryOnFailure; +import org.apache.flink.testutils.junit.RetryRule; +import org.apache.flink.util.TestLogger; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.CompactionStyle; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; + +/** + * Test that validates that the performance of APIs of RocksDB's ListState is as expected. + * + * <p>Benchmarking: + * + * <p>Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel Core i7, Memory 16GB 1600MHz DDR3 + * Number of values added | time for add() | time for update() | perf improvement of update() over add() + * 10 236875 ns 17048 ns 13.90x + * 50 312332 ns 14281 ns 21.87x + * 100 393791 ns 18360 ns 21.45x + * 500 978703 ns 55397 ns 17.66x + * 1000 3044179 ns 89474 ns 34.02x + * 5000 9247395 ns 305580 ns 30.26x + * 10000 16416442 ns 605963 ns 27.09x + * 50000 84311205 ns 5691288 ns 14.81x + * 100000 195103310 ns 12914182 ns 15.11x + * 500000 1223141510 ns 70595881 ns 17.33x + * + * <p>In summary, update() API which pre-merges all values gives users 15-35x performance improvements. + * For most frequent use cases where there are a few hundreds to a few thousands values per key, + * users can get 30x - 35x performance improvement! + * + */ +public class RocksDBListStatePerformanceTest extends TestLogger { + + private static final byte DELIMITER = ','; + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Rule + public final RetryRule retry = new RetryRule(); + + @Test(timeout = 2000) + @RetryOnFailure(times = 3) + public void testRocksDbListStateAPIs() throws Exception { + final File rocksDir = tmp.newFolder(); + + // ensure the RocksDB library is loaded to a distinct location each retry + NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath()); + + final String key1 = "key1"; + final String key2 = "key2"; + final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + + final byte[] keyBytes1 = key1.getBytes(StandardCharsets.UTF_8); + final byte[] keyBytes2 = key2.getBytes(StandardCharsets.UTF_8); + final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + + // The number of values added to ListState. Can be changed for benchmarking + final int num = 10; + + try ( + final Options options = new Options() + .setCompactionStyle(CompactionStyle.LEVEL) + .setLevelCompactionDynamicLevelBytes(true) + .setIncreaseParallelism(4) + .setUseFsync(false) + .setMaxOpenFiles(-1) + .setCreateIfMissing(true) + .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); + + final WriteOptions writeOptions = new WriteOptions() + .setSync(false) + .setDisableWAL(true); + + final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) { + + // ----- add() API ----- + log.info("begin add"); + + final long beginInsert1 = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(writeOptions, keyBytes1, valueBytes); + } + final long endInsert1 = System.nanoTime(); + + log.info("end add - duration: {} ns", (endInsert1 - beginInsert1)); + + // ----- update() API ----- + + List<byte[]> list = new ArrayList<>(num); + for (int i = 0; i < num; i++) { + list.add(valueBytes); + } + byte[] premerged = merge(list); + + log.info("begin update"); + + final long beginInsert2 = System.nanoTime(); + rocksDB.merge(writeOptions, keyBytes2, premerged); + final long endInsert2 = System.nanoTime(); + + log.info("end update - duration: {} ns", (endInsert2 - beginInsert2)); + } + } + + /** + * Merge operands into a single value that can be put directly into RocksDB. + */ + public static byte[] merge(List<byte[]> operands) { + if (operands == null || operands.size() == 0) { + return null; + } + + if (operands.size() == 1) { + return operands.get(0); + } + + int numBytes = 0; + for (byte[] arr : operands) { + numBytes += arr.length + 1; + } + numBytes--; + + byte[] result = new byte[numBytes]; + + System.arraycopy(operands.get(0), 0, result, 0, operands.get(0).length); + + for (int i = 1, arrIndex = operands.get(0).length; i < operands.size(); i++) { + result[arrIndex] = DELIMITER; + arrIndex += 1; + System.arraycopy(operands.get(i), 0, result, arrIndex, operands.get(i).length); + arrIndex += operands.get(i).length; + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java new file mode 100644 index 0000000..e05e7ae --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.benchmark; + +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; +import org.apache.flink.core.memory.MemoryUtils; +import org.apache.flink.testutils.junit.RetryOnFailure; +import org.apache.flink.testutils.junit.RetryRule; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.CompactionStyle; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksIterator; +import org.rocksdb.WriteOptions; +import sun.misc.Unsafe; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Test that validates that the performance of RocksDB is as expected. + * This test guards against the bug filed as 'FLINK-5756' + */ +public class RocksDBPerformanceTest extends TestLogger { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Rule + public final RetryRule retry = new RetryRule(); + + private File rocksDir; + private Options options; + private WriteOptions writeOptions; + + private final String key = "key"; + private final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; + + private final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + private final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); + + @Before + public void init() throws IOException { + rocksDir = tmp.newFolder(); + + // ensure the RocksDB library is loaded to a distinct location each retry + NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath()); + + options = new Options() + .setCompactionStyle(CompactionStyle.LEVEL) + .setLevelCompactionDynamicLevelBytes(true) + .setIncreaseParallelism(4) + .setUseFsync(false) + .setMaxOpenFiles(-1) + .setCreateIfMissing(true) + .setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME); + + writeOptions = new WriteOptions() + .setSync(false) + .setDisableWAL(true); + } + + @Test(timeout = 2000) + @RetryOnFailure(times = 3) + public void testRocksDbMergePerformance() throws Exception { + final int num = 50000; + + try (RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) { + + // ----- insert ----- + log.info("begin insert"); + + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + rocksDB.merge(writeOptions, keyBytes, valueBytes); + } + final long endInsert = System.nanoTime(); + log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000); + + // ----- read (attempt 1) ----- + + final byte[] resultHolder = new byte[num * (valueBytes.length + 2)]; + final long beginGet1 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet1 = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet1 - beginGet1) / 1_000_000); + + // ----- read (attempt 2) ----- + + final long beginGet2 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet2 = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet2 - beginGet2) / 1_000_000); + + // ----- compact ----- + log.info("compacting..."); + final long beginCompact = System.nanoTime(); + rocksDB.compactRange(); + final long endCompact = System.nanoTime(); + + log.info("end compaction - duration: {} ms", (endCompact - beginCompact) / 1_000_000); + + // ----- read (attempt 3) ----- + + final long beginGet3 = System.nanoTime(); + rocksDB.get(keyBytes, resultHolder); + final long endGet3 = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet3 - beginGet3) / 1_000_000); + } + } + + @Test(timeout = 2000) + @RetryOnFailure(times = 3) + public void testRocksDbRangeGetPerformance() throws Exception { + final int num = 50000; + + try (RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath())) { + + final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); + + final Unsafe unsafe = MemoryUtils.UNSAFE; + final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; + + log.info("begin insert"); + + final long beginInsert = System.nanoTime(); + for (int i = 0; i < num; i++) { + unsafe.putInt(keyTemplate, offset, i); + rocksDB.put(writeOptions, keyTemplate, valueBytes); + } + final long endInsert = System.nanoTime(); + log.info("end insert - duration: {} ms", (endInsert - beginInsert) / 1_000_000); + + @SuppressWarnings("MismatchedReadAndWriteOfArray") + final byte[] resultHolder = new byte[num * valueBytes.length]; + + final long beginGet = System.nanoTime(); + + int pos = 0; + + try (final RocksIterator iterator = rocksDB.newIterator()) { + // seek to start + unsafe.putInt(keyTemplate, offset, 0); + iterator.seek(keyTemplate); + + // iterate + while (iterator.isValid() && samePrefix(keyBytes, iterator.key())) { + byte[] currValue = iterator.value(); + System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); + pos += currValue.length; + iterator.next(); + } + } + + final long endGet = System.nanoTime(); + + log.info("end get - duration: {} ms", (endGet - beginGet) / 1_000_000); + } + } + + private static boolean samePrefix(byte[] prefix, byte[] key) { + for (int i = 0; i < prefix.length; i++) { + if (prefix[i] != key [i]) { + return false; + } + } + + return true; + } + + @After + public void close() { + options.close(); + writeOptions.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties b/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..881dc06 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=OFF, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4j.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4j.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/pom.xml ---------------------------------------------------------------------- diff --git a/flink-state-backends/pom.xml b/flink-state-backends/pom.xml new file mode 100644 index 0000000..c02ad84 --- /dev/null +++ b/flink-state-backends/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.5-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-state-backends</artifactId> + <name>flink-state-backends</name> + + <packaging>pom</packaging> + + <modules> + <module>flink-statebackend-rocksdb</module> + </modules> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9416200..5f0ebb4 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,7 @@ under the License. <module>flink-tests</module> <module>flink-end-to-end-tests</module> <module>flink-test-utils-parent</module> + <module>flink-state-backends</module> <module>flink-libraries</module> <module>flink-scala-shell</module> <module>flink-quickstart</module> http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/tools/travis_mvn_watchdog.sh ---------------------------------------------------------------------- diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh index 71b4115..82493ce 100755 --- a/tools/travis_mvn_watchdog.sh +++ b/tools/travis_mvn_watchdog.sh @@ -45,7 +45,7 @@ LOG4J_PROPERTIES=${HERE}/log4j-travis.properties MODULES_CORE="\ flink-test-utils-parent/flink-test-utils,\ -flink-contrib/flink-statebackend-rocksdb,\ +flink-state-backends/flink-statebackend-rocksdb,\ flink-clients,\ flink-core,\ flink-java,\