Repository: flink
Updated Branches:
  refs/heads/master 3d52f52e9 -> 2f7392d77


http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
new file mode 100644
index 0000000..3a39ba0
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for configuring the RocksDB State Backend.
+ */
+@SuppressWarnings("serial")
+public class RocksDBStateBackendConfigTest {
+
+       @Rule
+       public TemporaryFolder tempFolder = new TemporaryFolder();
+
+       // 
------------------------------------------------------------------------
+       //  RocksDB local file directory
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testDefaultsInSync() throws Exception {
+               final boolean defaultIncremental = 
CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+               RocksDBStateBackend backend = new 
RocksDBStateBackend(tempFolder.newFolder().toURI());
+               assertEquals(defaultIncremental, 
backend.isIncrementalCheckpointsEnabled());
+       }
+
+       @Test
+       public void testSetDbPath() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               File testDir1 = tempFolder.newFolder();
+               File testDir2 = tempFolder.newFolder();
+
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               assertNull(rocksDbBackend.getDbStoragePaths());
+
+               rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath());
+               assertArrayEquals(new String[] { new 
Path(testDir1.getAbsolutePath()).toString() }, 
rocksDbBackend.getDbStoragePaths());
+
+               rocksDbBackend.setDbStoragePath(null);
+               assertNull(rocksDbBackend.getDbStoragePaths());
+
+               rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), 
testDir2.getAbsolutePath());
+               assertArrayEquals(new String[] { new 
Path(testDir1.getAbsolutePath()).toString(), new 
Path(testDir2.getAbsolutePath()).toString() }, 
rocksDbBackend.getDbStoragePaths());
+
+               Environment env = getMockEnvironment();
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+                               createKeyedStateBackend(
+                                               env,
+                                               env.getJobID(),
+                                               "test_op",
+                                               IntSerializer.INSTANCE,
+                                               1,
+                                               new KeyGroupRange(0, 0),
+                                               env.getTaskKvStateRegistry());
+
+               try {
+                       File instanceBasePath = 
keyedBackend.getInstanceBasePath();
+                       assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(testDir1.getAbsolutePath()), 
startsWith(testDir2.getAbsolutePath())));
+
+                       //noinspection NullArgumentToVariableArgMethod
+                       rocksDbBackend.setDbStoragePaths(null);
+                       assertNull(rocksDbBackend.getDbStoragePaths());
+               } finally {
+                       IOUtils.closeQuietly(keyedBackend);
+                       keyedBackend.dispose();
+               }
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testSetNullPaths() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+               rocksDbBackend.setDbStoragePaths();
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testNonFileSchemePath() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+               
rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
+       }
+
+       // 
------------------------------------------------------------------------
+       //  RocksDB local file automatic from temp directories
+       // 
------------------------------------------------------------------------
+
+       /**
+        * This tests whether the RocksDB backends uses the temp directories 
that are provided
+        * from the {@link Environment} when no db storage path is set.
+        */
+       @Test
+       public void testUseTempDirectories() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               File dir1 = tempFolder.newFolder();
+               File dir2 = tempFolder.newFolder();
+
+               File[] tempDirs = new File[] { dir1, dir2 };
+
+               assertNull(rocksDbBackend.getDbStoragePaths());
+
+               Environment env = getMockEnvironment(tempDirs);
+               RocksDBKeyedStateBackend<Integer> keyedBackend = 
(RocksDBKeyedStateBackend<Integer>) rocksDbBackend.
+                               createKeyedStateBackend(
+                                               env,
+                                               env.getJobID(),
+                                               "test_op",
+                                               IntSerializer.INSTANCE,
+                                               1,
+                                               new KeyGroupRange(0, 0),
+                                               env.getTaskKvStateRegistry());
+
+               try {
+                       File instanceBasePath = 
keyedBackend.getInstanceBasePath();
+                       assertThat(instanceBasePath.getAbsolutePath(), 
anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
+               } finally {
+                       IOUtils.closeQuietly(keyedBackend);
+                       keyedBackend.dispose();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  RocksDB local file directory initialization
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testFailWhenNoLocalStorageDir() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+               File targetDir = tempFolder.newFolder();
+
+               try {
+                       if (!targetDir.setWritable(false, false)) {
+                               System.err.println("Cannot execute 
'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable");
+                               return;
+                       }
+
+                       
rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
+
+                       boolean hasFailure = false;
+                       try {
+                               Environment env = getMockEnvironment();
+                               rocksDbBackend.createKeyedStateBackend(
+                                               env,
+                                               env.getJobID(),
+                                               "foobar",
+                                               IntSerializer.INSTANCE,
+                                               1,
+                                               new KeyGroupRange(0, 0),
+                                               new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+                       }
+                       catch (Exception e) {
+                               assertTrue(e.getMessage().contains("No local 
storage directories available"));
+                               
assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
+                               hasFailure = true;
+                       }
+                       assertTrue("We must see a failure because no storaged 
directory is feasible.", hasFailure);
+               }
+               finally {
+                       //noinspection ResultOfMethodCallIgnored
+                       targetDir.setWritable(true, false);
+               }
+       }
+
+       @Test
+       public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
+               File targetDir1 = tempFolder.newFolder();
+               File targetDir2 = tempFolder.newFolder();
+
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               try {
+
+                       if (!targetDir1.setWritable(false, false)) {
+                               System.err.println("Cannot execute 
'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory 
non-writable");
+                               return;
+                       }
+
+                       
rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), 
targetDir2.getAbsolutePath());
+
+                       try {
+                               Environment env = getMockEnvironment();
+                               AbstractKeyedStateBackend<Integer> 
keyedStateBackend = rocksDbBackend.createKeyedStateBackend(
+                                       env,
+                                       env.getJobID(),
+                                       "foobar",
+                                       IntSerializer.INSTANCE,
+                                       1,
+                                       new KeyGroupRange(0, 0),
+                                       new 
KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+
+                               IOUtils.closeQuietly(keyedStateBackend);
+                               keyedStateBackend.dispose();
+                       }
+                       catch (Exception e) {
+                               e.printStackTrace();
+                               fail("Backend initialization failed even though 
some paths were available");
+                       }
+               } finally {
+                       //noinspection ResultOfMethodCallIgnored
+                       targetDir1.setWritable(true, false);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  RocksDB Options
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testPredefinedOptions() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
+
+               
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+               assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
+
+               try (ColumnFamilyOptions colCreated = 
rocksDbBackend.getColumnOptions()) {
+                       assertEquals(CompactionStyle.LEVEL, 
colCreated.compactionStyle());
+               }
+       }
+
+       @Test
+       public void testOptionsFactory() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               rocksDbBackend.setOptions(new OptionsFactory() {
+                       @Override
+                       public DBOptions createDBOptions(DBOptions 
currentOptions) {
+                               return currentOptions;
+                       }
+
+                       @Override
+                       public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions) {
+                               return 
currentOptions.setCompactionStyle(CompactionStyle.FIFO);
+                       }
+               });
+
+               assertNotNull(rocksDbBackend.getOptions());
+               assertEquals(CompactionStyle.FIFO, 
rocksDbBackend.getColumnOptions().compactionStyle());
+       }
+
+       @Test
+       public void testPredefinedAndOptionsFactory() throws Exception {
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(checkpointPath);
+
+               assertEquals(PredefinedOptions.DEFAULT, 
rocksDbBackend.getPredefinedOptions());
+
+               
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+               rocksDbBackend.setOptions(new OptionsFactory() {
+                       @Override
+                       public DBOptions createDBOptions(DBOptions 
currentOptions) {
+                               return currentOptions;
+                       }
+
+                       @Override
+                       public ColumnFamilyOptions 
createColumnOptions(ColumnFamilyOptions currentOptions) {
+                               return 
currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
+                       }
+               });
+
+               assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, 
rocksDbBackend.getPredefinedOptions());
+               assertNotNull(rocksDbBackend.getOptions());
+               assertEquals(CompactionStyle.UNIVERSAL, 
rocksDbBackend.getColumnOptions().compactionStyle());
+       }
+
+       @Test
+       public void testPredefinedOptionsEnum() {
+               for (PredefinedOptions o : PredefinedOptions.values()) {
+                       try (DBOptions opt = o.createDBOptions()) {
+                               assertNotNull(opt);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Reconfiguration
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testRocksDbReconfigurationCopiesExistingValues() throws 
Exception {
+               final FsStateBackend checkpointBackend = new 
FsStateBackend(tempFolder.newFolder().toURI().toString());
+               final boolean incremental = 
!CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+               final RocksDBStateBackend original = new 
RocksDBStateBackend(checkpointBackend, incremental);
+
+               // these must not be the default options
+               final PredefinedOptions predOptions = 
PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
+               assertNotEquals(predOptions, original.getPredefinedOptions());
+               original.setPredefinedOptions(predOptions);
+
+               final OptionsFactory optionsFactory = 
mock(OptionsFactory.class);
+               original.setOptions(optionsFactory);
+
+               final String[] localDirs = new String[] {
+                               tempFolder.newFolder().getAbsolutePath(), 
tempFolder.newFolder().getAbsolutePath() };
+               original.setDbStoragePaths(localDirs);
+
+               RocksDBStateBackend copy = original.configure(new 
Configuration());
+
+               assertEquals(original.isIncrementalCheckpointsEnabled(), 
copy.isIncrementalCheckpointsEnabled());
+               assertArrayEquals(original.getDbStoragePaths(), 
copy.getDbStoragePaths());
+               assertEquals(original.getOptions(), copy.getOptions());
+               assertEquals(original.getPredefinedOptions(), 
copy.getPredefinedOptions());
+
+               FsStateBackend copyCheckpointBackend = (FsStateBackend) 
copy.getCheckpointBackend();
+               assertEquals(checkpointBackend.getCheckpointPath(), 
copyCheckpointBackend.getCheckpointPath());
+               assertEquals(checkpointBackend.getSavepointPath(), 
copyCheckpointBackend.getSavepointPath());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Contained Non-partitioned State Backend
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testCallsForwardedToNonPartitionedBackend() throws 
Exception {
+               AbstractStateBackend storageBackend = new MemoryStateBackend();
+               RocksDBStateBackend rocksDbBackend = new 
RocksDBStateBackend(storageBackend);
+               assertEquals(storageBackend, 
rocksDbBackend.getCheckpointBackend());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       static Environment getMockEnvironment() {
+               return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
+       }
+
+       static Environment getMockEnvironment(File[] tempDirs) {
+               final String[] tempDirStrings = new String[tempDirs.length];
+               for (int i = 0; i < tempDirs.length; i++) {
+                       tempDirStrings[i] = tempDirs[i].getAbsolutePath();
+               }
+
+               IOManager ioMan = mock(IOManager.class);
+               when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
+
+               Environment env = mock(Environment.class);
+               when(env.getJobID()).thenReturn(new JobID());
+               
when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
+               when(env.getIOManager()).thenReturn(ioMan);
+               when(env.getTaskKvStateRegistry()).thenReturn(new 
KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()));
+
+               TaskInfo taskInfo = mock(TaskInfo.class);
+               when(env.getTaskInfo()).thenReturn(taskInfo);
+               when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
+
+               TaskManagerRuntimeInfo tmInfo = new 
TestingTaskManagerRuntimeInfo(new Configuration(), tempDirStrings);
+               when(env.getTaskManagerInfo()).thenReturn(tmInfo);
+
+               return env;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
new file mode 100644
index 0000000..7612c4c
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactoryTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the RocksDBStateBackendFactory.
+ */
+public class RocksDBStateBackendFactoryTest {
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       private final ClassLoader cl = getClass().getClassLoader();
+
+       private final String backendKey = 
CheckpointingOptions.STATE_BACKEND.key();
+
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testFactoryName() {
+               // construct the name such that it will not be automatically 
adjusted on refactorings
+               String factoryName = 
"org.apache.flink.contrib.streaming.state.Roc";
+               factoryName += "ksDBStateBackendFactory";
+
+               // !!! if this fails, the code in StateBackendLoader must be 
adjusted
+               assertEquals(factoryName, 
RocksDBStateBackendFactory.class.getName());
+       }
+
+       /**
+        * Validates loading a file system state backend with additional 
parameters from the cluster configuration.
+        */
+       @Test
+       public void testLoadFileSystemStateBackend() throws Exception {
+               final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
+               final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
+               final String localDir1 = tmp.newFolder().getAbsolutePath();
+               final String localDir2 = tmp.newFolder().getAbsolutePath();
+               final String localDirs = localDir1 + File.pathSeparator + 
localDir2;
+               final boolean incremental = 
!CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+               final Path expectedCheckpointsPath = new Path(checkpointDir);
+               final Path expectedSavepointsPath = new Path(savepointDir);
+
+               // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+               // to guard against config-breaking changes of the name
+               final Configuration config1 = new Configuration();
+               config1.setString(backendKey, "rocksdb");
+               config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
+               config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
+               
config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+               
config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
+
+               final Configuration config2 = new Configuration();
+               config2.setString(backendKey, 
RocksDBStateBackendFactory.class.getName());
+               config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
+               config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
+               
config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
+               
config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);
+
+               StateBackend backend1 = 
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+               StateBackend backend2 = 
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+               assertTrue(backend1 instanceof RocksDBStateBackend);
+               assertTrue(backend2 instanceof RocksDBStateBackend);
+
+               RocksDBStateBackend fs1 = (RocksDBStateBackend) backend1;
+               RocksDBStateBackend fs2 = (RocksDBStateBackend) backend2;
+
+               AbstractFileStateBackend fs1back = (AbstractFileStateBackend) 
fs1.getCheckpointBackend();
+               AbstractFileStateBackend fs2back = (AbstractFileStateBackend) 
fs2.getCheckpointBackend();
+
+               assertEquals(expectedCheckpointsPath, 
fs1back.getCheckpointPath());
+               assertEquals(expectedCheckpointsPath, 
fs2back.getCheckpointPath());
+               assertEquals(expectedSavepointsPath, 
fs1back.getSavepointPath());
+               assertEquals(expectedSavepointsPath, 
fs2back.getSavepointPath());
+               assertEquals(incremental, 
fs1.isIncrementalCheckpointsEnabled());
+               assertEquals(incremental, 
fs2.isIncrementalCheckpointsEnabled());
+               checkPaths(fs1.getDbStoragePaths(), localDir1, localDir2);
+               checkPaths(fs2.getDbStoragePaths(), localDir1, localDir2);
+       }
+
+       /**
+        * Validates taking the application-defined file system state backend 
and adding with additional
+        * parameters from the cluster configuration, but giving precedence to 
application-defined
+        * parameters over configuration-defined parameters.
+        */
+       @Test
+       public void testLoadFileSystemStateBackendMixed() throws Exception {
+               final String appCheckpointDir = new 
Path(tmp.newFolder().toURI()).toString();
+               final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
+               final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
+
+               final String localDir1 = tmp.newFolder().getAbsolutePath();
+               final String localDir2 = tmp.newFolder().getAbsolutePath();
+               final String localDir3 = tmp.newFolder().getAbsolutePath();
+               final String localDir4 = tmp.newFolder().getAbsolutePath();
+
+               final boolean incremental = 
!CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
+
+               final Path expectedCheckpointsPath = new Path(appCheckpointDir);
+               final Path expectedSavepointsPath = new Path(savepointDir);
+
+               final RocksDBStateBackend backend = new 
RocksDBStateBackend(appCheckpointDir, incremental);
+               backend.setDbStoragePaths(localDir1, localDir2);
+
+               final Configuration config = new Configuration();
+               config.setString(backendKey, "jobmanager"); // this should not 
be picked up
+               config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir); // this should not be picked up
+               config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
+               config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, 
!incremental);  // this should not be picked up
+               
config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + 
":" + localDir4);  // this should not be picked up
+
+               final StateBackend loadedBackend =
+                               
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
+               assertTrue(loadedBackend instanceof RocksDBStateBackend);
+
+               final RocksDBStateBackend loadedRocks = (RocksDBStateBackend) 
loadedBackend;
+
+               assertEquals(incremental, 
loadedRocks.isIncrementalCheckpointsEnabled());
+               checkPaths(loadedRocks.getDbStoragePaths(), localDir1, 
localDir2);
+
+               AbstractFileStateBackend fsBackend = (AbstractFileStateBackend) 
loadedRocks.getCheckpointBackend();
+               assertEquals(expectedCheckpointsPath, 
fsBackend.getCheckpointPath());
+               assertEquals(expectedSavepointsPath, 
fsBackend.getSavepointPath());
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static void checkPaths(String[] pathsArray, String... paths) {
+               assertNotNull(pathsArray);
+               assertNotNull(paths);
+
+               assertEquals(pathsArray.length, paths.length);
+
+               HashSet<String> pathsSet = new 
HashSet<>(Arrays.asList(pathsArray));
+
+               for (String path : paths) {
+                       assertTrue(pathsSet.contains(path));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
new file mode 100644
index 0000000..54af400
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.filefilter.IOFileFilter;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.RocksObject;
+import org.rocksdb.Snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.RunnableFuture;
+
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+/**
+ * Tests for the partitioned state part of {@link RocksDBStateBackend}.
+ */
+@RunWith(Parameterized.class)
+public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBackend> {
+
+       private OneShotLatch blocker;
+       private OneShotLatch waiter;
+       private BlockerCheckpointStreamFactory testStreamFactory;
+       private RocksDBKeyedStateBackend<Integer> keyedStateBackend;
+       private List<RocksObject> allCreatedCloseables;
+       private ValueState<Integer> testState1;
+       private ValueState<String> testState2;
+
+       @Parameterized.Parameters(name = "Incremental checkpointing: {0}")
+       public static Collection<Boolean> parameters() {
+               return Arrays.asList(false, true);
+       }
+
+       @Parameterized.Parameter
+       public boolean enableIncrementalCheckpointing;
+
+       @Rule
+       public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+       // Store it because we need it for the cleanup test.
+       private String dbPath;
+
+       @Override
+       protected RocksDBStateBackend getStateBackend() throws IOException {
+               dbPath = tempFolder.newFolder().getAbsolutePath();
+               String checkpointPath = 
tempFolder.newFolder().toURI().toString();
+               RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), enableIncrementalCheckpointing);
+               backend.setDbStoragePath(dbPath);
+               return backend;
+       }
+
+       // small safety net for instance cleanups, so that no native objects 
are left
+       @After
+       public void cleanupRocksDB() {
+               if (keyedStateBackend != null) {
+                       IOUtils.closeQuietly(keyedStateBackend);
+                       keyedStateBackend.dispose();
+               }
+
+               if (allCreatedCloseables != null) {
+                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
+                               verify(rocksCloseable, times(1)).close();
+                       }
+                       allCreatedCloseables = null;
+               }
+       }
+
+       public void setupRocksKeyedStateBackend() throws Exception {
+
+               blocker = new OneShotLatch();
+               waiter = new OneShotLatch();
+               testStreamFactory = new BlockerCheckpointStreamFactory(1024 * 
1024);
+               testStreamFactory.setBlockerLatch(blocker);
+               testStreamFactory.setWaiterLatch(waiter);
+               testStreamFactory.setAfterNumberInvocations(10);
+
+               RocksDBStateBackend backend = getStateBackend();
+               Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+               keyedStateBackend = (RocksDBKeyedStateBackend<Integer>) 
backend.createKeyedStateBackend(
+                               env,
+                               new JobID(),
+                               "Test",
+                               IntSerializer.INSTANCE,
+                               2,
+                               new KeyGroupRange(0, 1),
+                               mock(TaskKvStateRegistry.class));
+
+               keyedStateBackend.restore(null);
+
+               testState1 = keyedStateBackend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("TestState-1", 
Integer.class, 0));
+
+               testState2 = keyedStateBackend.getPartitionedState(
+                               VoidNamespace.INSTANCE,
+                               VoidNamespaceSerializer.INSTANCE,
+                               new ValueStateDescriptor<>("TestState-2", 
String.class, ""));
+
+               allCreatedCloseables = new ArrayList<>();
+
+               keyedStateBackend.db = spy(keyedStateBackend.db);
+
+               doAnswer(new Answer<Object>() {
+
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               RocksIterator rocksIterator = 
spy((RocksIterator) invocationOnMock.callRealMethod());
+                               allCreatedCloseables.add(rocksIterator);
+                               return rocksIterator;
+                       }
+               
}).when(keyedStateBackend.db).newIterator(any(ColumnFamilyHandle.class), 
any(ReadOptions.class));
+
+               doAnswer(new Answer<Object>() {
+
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               Snapshot snapshot = spy((Snapshot) 
invocationOnMock.callRealMethod());
+                               allCreatedCloseables.add(snapshot);
+                               return snapshot;
+                       }
+               }).when(keyedStateBackend.db).getSnapshot();
+
+               doAnswer(new Answer<Object>() {
+
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               ColumnFamilyHandle snapshot = 
spy((ColumnFamilyHandle) invocationOnMock.callRealMethod());
+                               allCreatedCloseables.add(snapshot);
+                               return snapshot;
+                       }
+               
}).when(keyedStateBackend.db).createColumnFamily(any(ColumnFamilyDescriptor.class));
+
+               for (int i = 0; i < 100; ++i) {
+                       keyedStateBackend.setCurrentKey(i);
+                       testState1.update(4200 + i);
+                       testState2.update("S-" + (4200 + i));
+               }
+       }
+
+       @Test
+       public void testCorrectMergeOperatorSet() throws IOException {
+
+               final ColumnFamilyOptions columnFamilyOptions = spy(new 
ColumnFamilyOptions());
+               RocksDBKeyedStateBackend<Integer> test = null;
+               try {
+                       test = new RocksDBKeyedStateBackend<>(
+                               "test",
+                               Thread.currentThread().getContextClassLoader(),
+                               tempFolder.newFolder(),
+                               mock(DBOptions.class),
+                               columnFamilyOptions,
+                               mock(TaskKvStateRegistry.class),
+                               IntSerializer.INSTANCE,
+                               1,
+                               new KeyGroupRange(0, 0),
+                               new ExecutionConfig(),
+                               enableIncrementalCheckpointing);
+
+                       verify(columnFamilyOptions, Mockito.times(1))
+                               
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+               } finally {
+                       if (test != null) {
+                               IOUtils.closeQuietly(test);
+                               test.dispose();
+                       }
+                       columnFamilyOptions.close();
+               }
+       }
+
+       @Test
+       public void testReleasingSnapshotAfterBackendClosed() throws Exception {
+               setupRocksKeyedStateBackend();
+
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+
+                       RocksDB spyDB = keyedStateBackend.db;
+
+                       if (!enableIncrementalCheckpointing) {
+                               verify(spyDB, times(1)).getSnapshot();
+                               verify(spyDB, 
times(0)).releaseSnapshot(any(Snapshot.class));
+                       }
+
+                       //Ensure every RocksObjects not closed yet
+                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
+                               verify(rocksCloseable, times(0)).close();
+                       }
+
+                       snapshot.cancel(true);
+
+                       this.keyedStateBackend.dispose();
+
+                       verify(spyDB, times(1)).close();
+                       assertEquals(null, keyedStateBackend.db);
+
+                       //Ensure every RocksObjects was closed exactly once
+                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
+                               verify(rocksCloseable, times(1)).close();
+                       }
+
+               } finally {
+                       keyedStateBackend.dispose();
+                       keyedStateBackend = null;
+               }
+       }
+
+       @Test
+       public void testDismissingSnapshot() throws Exception {
+               setupRocksKeyedStateBackend();
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+                       snapshot.cancel(true);
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
+               }
+       }
+
+       @Test
+       public void testDismissingSnapshotNotRunnable() throws Exception {
+               setupRocksKeyedStateBackend();
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+                       snapshot.cancel(true);
+                       Thread asyncSnapshotThread = new Thread(snapshot);
+                       asyncSnapshotThread.start();
+                       try {
+                               snapshot.get();
+                               fail();
+                       } catch (Exception ignored) {
+
+                       }
+                       asyncSnapshotThread.join();
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
+               }
+       }
+
+       @Test
+       public void testCompletingSnapshot() throws Exception {
+               setupRocksKeyedStateBackend();
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+                       Thread asyncSnapshotThread = new Thread(snapshot);
+                       asyncSnapshotThread.start();
+                       waiter.await(); // wait for snapshot to run
+                       waiter.reset();
+                       runStateUpdates();
+                       blocker.trigger(); // allow checkpointing to start 
writing
+                       waiter.await(); // wait for snapshot stream writing to 
run
+                       KeyedStateHandle keyedStateHandle = snapshot.get();
+                       assertNotNull(keyedStateHandle);
+                       assertTrue(keyedStateHandle.getStateSize() > 0);
+                       assertEquals(2, 
keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+                       
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+                       asyncSnapshotThread.join();
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
+               }
+       }
+
+       @Test
+       public void testCancelRunningSnapshot() throws Exception {
+               setupRocksKeyedStateBackend();
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+                       Thread asyncSnapshotThread = new Thread(snapshot);
+                       asyncSnapshotThread.start();
+                       waiter.await(); // wait for snapshot to run
+                       waiter.reset();
+                       runStateUpdates();
+                       snapshot.cancel(true);
+                       blocker.trigger(); // allow checkpointing to start 
writing
+                       
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+                       waiter.await(); // wait for snapshot stream writing to 
run
+                       try {
+                               snapshot.get();
+                               fail();
+                       } catch (Exception ignored) {
+                       }
+
+                       asyncSnapshotThread.join();
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
+               }
+       }
+
+       @Test
+       public void testDisposeDeletesAllDirectories() throws Exception {
+               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+               Collection<File> allFilesInDbDir =
+                       FileUtils.listFilesAndDirs(new File(dbPath), new 
AcceptAllFilter(), new AcceptAllFilter());
+               try {
+                       ValueStateDescriptor<String> kvId =
+                               new ValueStateDescriptor<>("id", String.class, 
null);
+
+                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+                       ValueState<String> state =
+                               
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+                       backend.setCurrentKey(1);
+                       state.update("Hello");
+
+                       // more than just the root directory
+                       assertTrue(allFilesInDbDir.size() > 1);
+               } finally {
+                       IOUtils.closeQuietly(backend);
+                       backend.dispose();
+               }
+               allFilesInDbDir =
+                       FileUtils.listFilesAndDirs(new File(dbPath), new 
AcceptAllFilter(), new AcceptAllFilter());
+
+               // just the root directory left
+               assertEquals(1, allFilesInDbDir.size());
+       }
+
+       @Test
+       public void testSharedIncrementalStateDeRegistration() throws Exception 
{
+               if (enableIncrementalCheckpointing) {
+                       AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+                       try {
+                               ValueStateDescriptor<String> kvId =
+                                       new ValueStateDescriptor<>("id", 
String.class, null);
+
+                               kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+
+                               ValueState<String> state =
+                                       
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+                               Queue<IncrementalKeyedStateHandle> 
previousStateHandles = new LinkedList<>();
+                               SharedStateRegistry sharedStateRegistry = 
spy(new SharedStateRegistry());
+                               for (int checkpointId = 0; checkpointId < 3; 
++checkpointId) {
+
+                                       reset(sharedStateRegistry);
+
+                                       backend.setCurrentKey(checkpointId);
+                                       state.update("Hello-" + checkpointId);
+
+                                       RunnableFuture<KeyedStateHandle> 
snapshot = backend.snapshot(
+                                               checkpointId,
+                                               checkpointId,
+                                               createStreamFactory(),
+                                               
CheckpointOptions.forCheckpointWithDefaultLocation());
+
+                                       snapshot.run();
+
+                                       IncrementalKeyedStateHandle stateHandle 
= (IncrementalKeyedStateHandle) snapshot.get();
+                                       Map<StateHandleID, StreamStateHandle> 
sharedState =
+                                               new 
HashMap<>(stateHandle.getSharedState());
+
+                                       
stateHandle.registerSharedStates(sharedStateRegistry);
+
+                                       for (Map.Entry<StateHandleID, 
StreamStateHandle> e : sharedState.entrySet()) {
+                                               
verify(sharedStateRegistry).registerReference(
+                                                       
stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
+                                                       e.getValue());
+                                       }
+
+                                       previousStateHandles.add(stateHandle);
+                                       
backend.notifyCheckpointComplete(checkpointId);
+
+                                       
//-----------------------------------------------------------------
+
+                                       if (previousStateHandles.size() > 1) {
+                                               
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+                                       }
+                               }
+
+                               while (!previousStateHandles.isEmpty()) {
+
+                                       reset(sharedStateRegistry);
+
+                                       
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+                               }
+                       } finally {
+                               IOUtils.closeQuietly(backend);
+                               backend.dispose();
+                       }
+               }
+       }
+
+       private void checkRemove(IncrementalKeyedStateHandle remove, 
SharedStateRegistry registry) throws Exception {
+               for (StateHandleID id : remove.getSharedState().keySet()) {
+                       verify(registry, times(0)).unregisterReference(
+                               
remove.createSharedStateRegistryKeyFromFileName(id));
+               }
+
+               remove.discardState();
+
+               for (StateHandleID id : remove.getSharedState().keySet()) {
+                       verify(registry).unregisterReference(
+                               
remove.createSharedStateRegistryKeyFromFileName(id));
+               }
+       }
+
+       private void runStateUpdates() throws Exception{
+               for (int i = 50; i < 150; ++i) {
+                       if (i % 10 == 0) {
+                               Thread.sleep(1);
+                       }
+                       keyedStateBackend.setCurrentKey(i);
+                       testState1.update(4200 + i);
+                       testState2.update("S-" + (4200 + i));
+               }
+       }
+
+       private void verifyRocksObjectsReleased() {
+               //Ensure every RocksObject was closed exactly once
+               for (RocksObject rocksCloseable : allCreatedCloseables) {
+                       verify(rocksCloseable, times(1)).close();
+               }
+
+               assertNotNull(null, keyedStateBackend.db);
+               RocksDB spyDB = keyedStateBackend.db;
+
+               if (!enableIncrementalCheckpointing) {
+                       verify(spyDB, times(1)).getSnapshot();
+                       verify(spyDB, 
times(1)).releaseSnapshot(any(Snapshot.class));
+               }
+
+               keyedStateBackend.dispose();
+               verify(spyDB, times(1)).close();
+               assertEquals(null, keyedStateBackend.db);
+       }
+
+       private static class AcceptAllFilter implements IOFileFilter {
+               @Override
+               public boolean accept(File file) {
+                       return true;
+               }
+
+               @Override
+               public boolean accept(File file, String s) {
+                       return true;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
new file mode 100644
index 0000000..72c85ec
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDbMultiClassLoaderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.RocksDB;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * This test validates that the RocksDB JNI library loading works properly
+ * in the presence of the RocksDB code being loaded dynamically via reflection.
+ * That can happen when RocksDB is in the user code JAR, or in certain test 
setups.
+ */
+public class RocksDbMultiClassLoaderTest {
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       @Test
+       public void testTwoSeparateClassLoaders() throws Exception {
+               // collect the libraries / class folders with RocksDB related 
code: the state backend and RocksDB itself
+               final URL codePath1 = 
RocksDBStateBackend.class.getProtectionDomain().getCodeSource().getLocation();
+               final URL codePath2 = 
RocksDB.class.getProtectionDomain().getCodeSource().getLocation();
+
+               final ClassLoader parent = getClass().getClassLoader();
+               final ClassLoader loader1 = 
FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, 
parent, new String[0]);
+               final ClassLoader loader2 = 
FlinkUserCodeClassLoaders.childFirst(new URL[] { codePath1, codePath2 }, 
parent, new String[0]);
+
+               final String className = RocksDBStateBackend.class.getName();
+
+               final Class<?> clazz1 = Class.forName(className, false, 
loader1);
+               final Class<?> clazz2 = Class.forName(className, false, 
loader2);
+               assertNotEquals("Test broken - the two reflectively loaded 
classes are equal", clazz1, clazz2);
+
+               final Object instance1 = 
clazz1.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString());
+               final Object instance2 = 
clazz2.getConstructor(String.class).newInstance(tmp.newFolder().toURI().toString());
+
+               final String tempDir = tmp.newFolder().getAbsolutePath();
+
+               final Method meth1 = 
clazz1.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
+               final Method meth2 = 
clazz2.getDeclaredMethod("ensureRocksDBIsLoaded", String.class);
+               meth1.setAccessible(true);
+               meth2.setAccessible(true);
+
+               // if all is well, these methods can both complete successfully
+               meth1.invoke(instance1, tempDir);
+               meth2.invoke(instance2, tempDir);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
new file mode 100644
index 0000000..670c355
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBListStatePerformanceTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test that validates that the performance of APIs of RocksDB's ListState is 
as expected.
+ *
+ * <p>Benchmarking:
+ *
+ * <p>Computer: MacbookPro (Mid 2015), Flash Storage, Processor 2.5GHz Intel 
Core i7, Memory 16GB 1600MHz DDR3
+ * Number of values added | time for add()   |  time for update() | perf 
improvement of update() over add()
+ * 10                                          236875 ns                       
17048 ns                        13.90x
+ * 50                                          312332 ns                       
14281 ns                        21.87x
+ * 100                                         393791 ns                       
18360 ns                        21.45x
+ * 500                                         978703 ns                       
55397 ns                        17.66x
+ * 1000                                                3044179 ns              
        89474 ns                        34.02x
+ * 5000                                                9247395 ns              
        305580 ns                       30.26x
+ * 10000                                       16416442 ns                     
605963 ns                       27.09x
+ * 50000                                       84311205 ns                     
5691288 ns                      14.81x
+ * 100000                                      195103310 ns            
12914182 ns                     15.11x
+ * 500000                                      1223141510 ns           
70595881 ns                     17.33x
+ *
+ * <p>In summary, update() API which pre-merges all values gives users 15-35x 
performance improvements.
+ * For most frequent use cases where there are a few hundreds to a few 
thousands values per key,
+ * users can get 30x - 35x performance improvement!
+ *
+ */
+public class RocksDBListStatePerformanceTest extends TestLogger {
+
+       private static final byte DELIMITER = ',';
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       @Rule
+       public final RetryRule retry = new RetryRule();
+
+       @Test(timeout = 2000)
+       @RetryOnFailure(times = 3)
+       public void testRocksDbListStateAPIs() throws Exception {
+               final File rocksDir = tmp.newFolder();
+
+               // ensure the RocksDB library is loaded to a distinct location 
each retry
+               
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
+
+               final String key1 = "key1";
+               final String key2 = "key2";
+               final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+               final byte[] keyBytes1 = key1.getBytes(StandardCharsets.UTF_8);
+               final byte[] keyBytes2 = key2.getBytes(StandardCharsets.UTF_8);
+               final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+
+               // The number of values added to ListState. Can be changed for 
benchmarking
+               final int num = 10;
+
+               try (
+                       final Options options = new Options()
+                                       
.setCompactionStyle(CompactionStyle.LEVEL)
+                                       
.setLevelCompactionDynamicLevelBytes(true)
+                                       .setIncreaseParallelism(4)
+                                       .setUseFsync(false)
+                                       .setMaxOpenFiles(-1)
+                                       .setCreateIfMissing(true)
+                                       
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+
+                       final WriteOptions writeOptions = new WriteOptions()
+                                       .setSync(false)
+                                       .setDisableWAL(true);
+
+                       final RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
+
+                       // ----- add() API -----
+                       log.info("begin add");
+
+                       final long beginInsert1 = System.nanoTime();
+                       for (int i = 0; i < num; i++) {
+                               rocksDB.merge(writeOptions, keyBytes1, 
valueBytes);
+                       }
+                       final long endInsert1 = System.nanoTime();
+
+                       log.info("end add - duration: {} ns", (endInsert1 - 
beginInsert1));
+
+                       // ----- update() API -----
+
+                       List<byte[]> list = new ArrayList<>(num);
+                       for (int i = 0; i < num; i++) {
+                               list.add(valueBytes);
+                       }
+                       byte[] premerged = merge(list);
+
+                       log.info("begin update");
+
+                       final long beginInsert2 = System.nanoTime();
+                       rocksDB.merge(writeOptions, keyBytes2, premerged);
+                       final long endInsert2 = System.nanoTime();
+
+                       log.info("end update - duration: {} ns", (endInsert2 - 
beginInsert2));
+               }
+       }
+
+       /**
+        * Merge operands into a single value that can be put directly into 
RocksDB.
+        */
+       public static byte[] merge(List<byte[]> operands) {
+               if (operands == null || operands.size() == 0) {
+                       return null;
+               }
+
+               if (operands.size() == 1) {
+                       return operands.get(0);
+               }
+
+               int numBytes = 0;
+               for (byte[] arr : operands) {
+                       numBytes += arr.length + 1;
+               }
+               numBytes--;
+
+               byte[] result = new byte[numBytes];
+
+               System.arraycopy(operands.get(0), 0, result, 0, 
operands.get(0).length);
+
+               for (int i = 1, arrIndex = operands.get(0).length; i < 
operands.size(); i++) {
+                       result[arrIndex] = DELIMITER;
+                       arrIndex += 1;
+                       System.arraycopy(operands.get(i), 0, result, arrIndex, 
operands.get(i).length);
+                       arrIndex += operands.get(i).length;
+               }
+
+               return result;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
new file mode 100644
index 0000000..e05e7ae
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/benchmark/RocksDBPerformanceTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.benchmark;
+
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.core.memory.MemoryUtils;
+import org.apache.flink.testutils.junit.RetryOnFailure;
+import org.apache.flink.testutils.junit.RetryRule;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+import sun.misc.Unsafe;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+/**
+ * Test that validates that the performance of RocksDB is as expected.
+ * This test guards against the bug filed as 'FLINK-5756'
+ */
+public class RocksDBPerformanceTest extends TestLogger {
+
+       @Rule
+       public final TemporaryFolder tmp = new TemporaryFolder();
+
+       @Rule
+       public final RetryRule retry = new RetryRule();
+
+       private File rocksDir;
+       private Options options;
+       private WriteOptions writeOptions;
+
+       private final String key = "key";
+       private final String value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321";
+
+       private final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
+       private final byte[] valueBytes = 
value.getBytes(StandardCharsets.UTF_8);
+
+       @Before
+       public void init() throws IOException {
+               rocksDir = tmp.newFolder();
+
+               // ensure the RocksDB library is loaded to a distinct location 
each retry
+               
NativeLibraryLoader.getInstance().loadLibrary(rocksDir.getAbsolutePath());
+
+               options = new Options()
+                               .setCompactionStyle(CompactionStyle.LEVEL)
+                               .setLevelCompactionDynamicLevelBytes(true)
+                               .setIncreaseParallelism(4)
+                               .setUseFsync(false)
+                               .setMaxOpenFiles(-1)
+                               .setCreateIfMissing(true)
+                               
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+
+               writeOptions = new WriteOptions()
+                               .setSync(false)
+                               .setDisableWAL(true);
+       }
+
+       @Test(timeout = 2000)
+       @RetryOnFailure(times = 3)
+       public void testRocksDbMergePerformance() throws Exception {
+               final int num = 50000;
+
+               try (RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
+
+                       // ----- insert -----
+                       log.info("begin insert");
+
+                       final long beginInsert = System.nanoTime();
+                       for (int i = 0; i < num; i++) {
+                               rocksDB.merge(writeOptions, keyBytes, 
valueBytes);
+                       }
+                       final long endInsert = System.nanoTime();
+                       log.info("end insert - duration: {} ms", (endInsert - 
beginInsert) / 1_000_000);
+
+                       // ----- read (attempt 1) -----
+
+                       final byte[] resultHolder = new byte[num * 
(valueBytes.length + 2)];
+                       final long beginGet1 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet1 = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet1 - 
beginGet1) / 1_000_000);
+
+                       // ----- read (attempt 2) -----
+
+                       final long beginGet2 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet2 = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet2 - 
beginGet2) / 1_000_000);
+
+                       // ----- compact -----
+                       log.info("compacting...");
+                       final long beginCompact = System.nanoTime();
+                       rocksDB.compactRange();
+                       final long endCompact = System.nanoTime();
+
+                       log.info("end compaction - duration: {} ms", 
(endCompact - beginCompact) / 1_000_000);
+
+                       // ----- read (attempt 3) -----
+
+                       final long beginGet3 = System.nanoTime();
+                       rocksDB.get(keyBytes, resultHolder);
+                       final long endGet3 = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet3 - 
beginGet3) / 1_000_000);
+               }
+       }
+
+       @Test(timeout = 2000)
+       @RetryOnFailure(times = 3)
+       public void testRocksDbRangeGetPerformance() throws Exception {
+               final int num = 50000;
+
+               try (RocksDB rocksDB = RocksDB.open(options, 
rocksDir.getAbsolutePath())) {
+
+                       final byte[] keyTemplate = Arrays.copyOf(keyBytes, 
keyBytes.length + 4);
+
+                       final Unsafe unsafe = MemoryUtils.UNSAFE;
+                       final long offset = 
unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4;
+
+                       log.info("begin insert");
+
+                       final long beginInsert = System.nanoTime();
+                       for (int i = 0; i < num; i++) {
+                               unsafe.putInt(keyTemplate, offset, i);
+                               rocksDB.put(writeOptions, keyTemplate, 
valueBytes);
+                       }
+                       final long endInsert = System.nanoTime();
+                       log.info("end insert - duration: {} ms", (endInsert - 
beginInsert) / 1_000_000);
+
+                       @SuppressWarnings("MismatchedReadAndWriteOfArray")
+                       final byte[] resultHolder = new byte[num * 
valueBytes.length];
+
+                       final long beginGet = System.nanoTime();
+
+                       int pos = 0;
+
+                       try (final RocksIterator iterator = 
rocksDB.newIterator()) {
+                               // seek to start
+                               unsafe.putInt(keyTemplate, offset, 0);
+                               iterator.seek(keyTemplate);
+
+                               // iterate
+                               while (iterator.isValid() && 
samePrefix(keyBytes, iterator.key())) {
+                                       byte[] currValue = iterator.value();
+                                       System.arraycopy(currValue, 0, 
resultHolder, pos, currValue.length);
+                                       pos += currValue.length;
+                                       iterator.next();
+                               }
+                       }
+
+                       final long endGet = System.nanoTime();
+
+                       log.info("end get - duration: {} ms", (endGet - 
beginGet) / 1_000_000);
+               }
+       }
+
+       private static boolean samePrefix(byte[] prefix, byte[] key) {
+               for (int i = 0; i < prefix.length; i++) {
+                       if (prefix[i] != key [i]) {
+                               return false;
+                       }
+               }
+
+               return true;
+       }
+
+       @After
+       public void close() {
+               options.close();
+               writeOptions.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..881dc06
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-state-backends/pom.xml
----------------------------------------------------------------------
diff --git a/flink-state-backends/pom.xml b/flink-state-backends/pom.xml
new file mode 100644
index 0000000..c02ad84
--- /dev/null
+++ b/flink-state-backends/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.5-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+       <artifactId>flink-state-backends</artifactId>
+       <name>flink-state-backends</name>
+
+       <packaging>pom</packaging>
+
+       <modules>
+               <module>flink-statebackend-rocksdb</module>
+       </modules>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 9416200..5f0ebb4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,7 @@ under the License.
                <module>flink-tests</module>
                <module>flink-end-to-end-tests</module>
                <module>flink-test-utils-parent</module>
+               <module>flink-state-backends</module>
                <module>flink-libraries</module>
                <module>flink-scala-shell</module>
                <module>flink-quickstart</module>

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/tools/travis_mvn_watchdog.sh
----------------------------------------------------------------------
diff --git a/tools/travis_mvn_watchdog.sh b/tools/travis_mvn_watchdog.sh
index 71b4115..82493ce 100755
--- a/tools/travis_mvn_watchdog.sh
+++ b/tools/travis_mvn_watchdog.sh
@@ -45,7 +45,7 @@ LOG4J_PROPERTIES=${HERE}/log4j-travis.properties
 
 MODULES_CORE="\
 flink-test-utils-parent/flink-test-utils,\
-flink-contrib/flink-statebackend-rocksdb,\
+flink-state-backends/flink-statebackend-rocksdb,\
 flink-clients,\
 flink-core,\
 flink-java,\

Reply via email to