Repository: flink
Updated Branches:
  refs/heads/master 3f4de57b1 -> 6642768ad


[FLINK-7460] [state backends] Close all ColumnFamilyHandles when restoring from 
rescaled incremental checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca87bec4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca87bec4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca87bec4

Branch: refs/heads/master
Commit: ca87bec4f79c32c9f6cf7a4aa96866f6fac957e0
Parents: 3f4de57
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Aug 14 14:01:03 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Aug 24 17:17:39 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 453 ++++++++++---------
 .../state/RocksDBStateBackendTest.java          | 313 +++++++------
 .../runtime/state/StateBackendTestBase.java     | 244 +++++-----
 3 files changed, 546 insertions(+), 464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 756cfdd..b7f386d 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,7 +105,9 @@ import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -138,6 +140,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** The name of the merge operator in RocksDB. Do not change except you 
know exactly what you do. */
        public static final String MERGE_OPERATOR_NAME = "stringappendtest";
 
+       /** Bytes for the name of the column decriptor for the default column 
family. */
+       public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = 
"default".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
        private final String operatorIdentifier;
 
        /** The column family options from the options factory. */
@@ -196,7 +201,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
 
        /** The identifier of the last completed checkpoint. */
-       private long lastCompletedCheckpointId = -1;
+       private long lastCompletedCheckpointId = -1L;
 
        /** Unique ID of this backend. */
        private UUID backendUID;
@@ -204,17 +209,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        private static final String SST_FILE_SUFFIX = ".sst";
 
        public RocksDBKeyedStateBackend(
-                       String operatorIdentifier,
-                       ClassLoader userCodeClassLoader,
-                       File instanceBasePath,
-                       DBOptions dbOptions,
-                       ColumnFamilyOptions columnFamilyOptions,
-                       TaskKvStateRegistry kvStateRegistry,
-                       TypeSerializer<K> keySerializer,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       ExecutionConfig executionConfig,
-                       boolean enableIncrementalCheckpointing
+               String operatorIdentifier,
+               ClassLoader userCodeClassLoader,
+               File instanceBasePath,
+               DBOptions dbOptions,
+               ColumnFamilyOptions columnFamilyOptions,
+               TaskKvStateRegistry kvStateRegistry,
+               TypeSerializer<K> keySerializer,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               ExecutionConfig executionConfig,
+               boolean enableIncrementalCheckpointing
        ) throws IOException {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
@@ -253,10 +258,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.restoredKvStateMetaInfos = new HashMap<>();
                this.materializedSstFiles = new TreeMap<>();
                this.backendUID = UUID.randomUUID();
-
-               LOG.debug("Setting initial backend ID in 
RocksDBKeyedStateBackend for operator {} to {}.",
-                       this.operatorIdentifier,
-                       this.backendUID);
+               LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
        }
 
        /**
@@ -277,7 +279,6 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                // DB is closed. So we start with the ones 
created by Flink...
                                for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
                                        kvStateInformation.values()) {
-
                                        IOUtils.closeQuietly(columnMetaData.f0);
                                }
 
@@ -328,10 +329,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         */
        @Override
        public RunnableFuture<KeyedStateHandle> snapshot(
-                       final long checkpointId,
-                       final long timestamp,
-                       final CheckpointStreamFactory streamFactory,
-                       CheckpointOptions checkpointOptions) throws Exception {
+               final long checkpointId,
+               final long timestamp,
+               final CheckpointStreamFactory streamFactory,
+               CheckpointOptions checkpointOptions) throws Exception {
 
                if (checkpointOptions.getCheckpointType() != 
CheckpointOptions.CheckpointType.SAVEPOINT &&
                        enableIncrementalCheckpointing) {
@@ -342,9 +343,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
-                       final long checkpointId,
-                       final long checkpointTimestamp,
-                       final CheckpointStreamFactory checkpointStreamFactory) 
throws Exception {
+               final long checkpointId,
+               final long checkpointTimestamp,
+               final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
 
                final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
                        new RocksDBIncrementalSnapshotOperation<>(
@@ -361,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        if (!hasRegisteredState()) {
                                if (LOG.isDebugEnabled()) {
                                        LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at " +
-                                                       checkpointTimestamp + " 
. Returning null.");
+                                               checkpointTimestamp + " . 
Returning null.");
                                }
                                return DoneFuture.nullValue();
                        }
@@ -391,9 +392,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        private RunnableFuture<KeyedStateHandle> snapshotFully(
-                       final long checkpointId,
-                       final long timestamp,
-                       final CheckpointStreamFactory streamFactory) throws 
Exception {
+               final long checkpointId,
+               final long timestamp,
+               final CheckpointStreamFactory streamFactory) throws Exception {
 
                long startTime = System.currentTimeMillis();
 
@@ -406,7 +407,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                if (!hasRegisteredState()) {
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Asynchronous RocksDB 
snapshot performed on empty keyed state at " + timestamp +
-                                                               " . Returning 
null.");
+                                                       " . Returning null.");
                                        }
                                        return DoneFuture.nullValue();
                                }
@@ -419,52 +420,52 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                // implementation of the async IO operation, based on FutureTask
                AbstractAsyncIOCallable<KeyedStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
-                               new AbstractAsyncIOCallable<KeyedStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream>() {
+                       new AbstractAsyncIOCallable<KeyedStateHandle, 
CheckpointStreamFactory.CheckpointStateOutputStream>() {
 
-                                       @Override
-                                       public 
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws 
Exception {
-                                               
snapshotOperation.openCheckpointStream();
-                                               return 
snapshotOperation.getOutStream();
-                                       }
+                               @Override
+                               public 
CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws 
Exception {
+                                       
snapshotOperation.openCheckpointStream();
+                                       return snapshotOperation.getOutStream();
+                               }
 
-                                       @Override
-                                       public KeyGroupsStateHandle 
performOperation() throws Exception {
-                                               long startTime = 
System.currentTimeMillis();
-                                               synchronized 
(asyncSnapshotLock) {
-                                                       try {
-                                                               // hold the db 
lock while operation on the db to guard us against async db disposal
-                                                               if (db == null) 
{
-                                                                       throw 
new IOException("RocksDB closed.");
-                                                               }
+                               @Override
+                               public KeyGroupsStateHandle performOperation() 
throws Exception {
+                                       long startTime = 
System.currentTimeMillis();
+                                       synchronized (asyncSnapshotLock) {
+                                               try {
+                                                       // hold the db lock 
while operation on the db to guard us against async db disposal
+                                                       if (db == null) {
+                                                               throw new 
IOException("RocksDB closed.");
+                                                       }
 
-                                                               
snapshotOperation.writeDBSnapshot();
+                                                       
snapshotOperation.writeDBSnapshot();
 
-                                                       } finally {
-                                                               
snapshotOperation.closeCheckpointStream();
-                                                       }
+                                               } finally {
+                                                       
snapshotOperation.closeCheckpointStream();
                                                }
+                                       }
 
-                                               LOG.info("Asynchronous RocksDB 
snapshot ({}, asynchronous part) in thread {} took {} ms.",
-                                                       streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
+                                       LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
+                                               streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
 
-                                               return 
snapshotOperation.getSnapshotResultStateHandle();
-                                       }
+                                       return 
snapshotOperation.getSnapshotResultStateHandle();
+                               }
 
-                                       private void 
releaseSnapshotOperationResources(boolean canceled) {
-                                               // hold the db lock while 
operation on the db to guard us against async db disposal
-                                               synchronized 
(asyncSnapshotLock) {
-                                                       
snapshotOperation.releaseSnapshotResources(canceled);
-                                               }
+                               private void 
releaseSnapshotOperationResources(boolean canceled) {
+                                       // hold the db lock while operation on 
the db to guard us against async db disposal
+                                       synchronized (asyncSnapshotLock) {
+                                               
snapshotOperation.releaseSnapshotResources(canceled);
                                        }
+                               }
 
-                                       @Override
-                                       public void done(boolean canceled) {
-                                               
releaseSnapshotOperationResources(canceled);
-                                       }
-                               };
+                               @Override
+                               public void done(boolean canceled) {
+                                       
releaseSnapshotOperationResources(canceled);
+                               }
+                       };
 
                LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", 
synchronous part) in thread " +
-                               Thread.currentThread() + " took " + 
(System.currentTimeMillis() - startTime) + " ms.");
+                       Thread.currentThread() + " took " + 
(System.currentTimeMillis() - startTime) + " ms.");
 
                return AsyncStoppableTaskWithCallback.from(ioCallable);
        }
@@ -493,8 +494,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private KeyGroupsStateHandle snapshotResultStateHandle;
 
                RocksDBFullSnapshotOperation(
-                               RocksDBKeyedStateBackend<K> stateBackend,
-                               CheckpointStreamFactory 
checkpointStreamFactory) {
+                       RocksDBKeyedStateBackend<K> stateBackend,
+                       CheckpointStreamFactory checkpointStreamFactory) {
 
                        this.stateBackend = stateBackend;
                        this.checkpointStreamFactory = checkpointStreamFactory;
@@ -523,7 +524,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                public void openCheckpointStream() throws Exception {
                        Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
                        outStream = checkpointStreamFactory.
-                                       
createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+                               createCheckpointStateOutputStream(checkpointId, 
checkpointTimeStamp);
                        
stateBackend.cancelStreamRegistry.registerClosable(outStream);
                        outputView = new DataOutputViewStreamWrapper(outStream);
                }
@@ -615,11 +616,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private void writeKVStateMetaData() throws IOException {
 
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
-                                       new 
ArrayList<>(stateBackend.kvStateInformation.size());
+                               new 
ArrayList<>(stateBackend.kvStateInformation.size());
 
                        int kvStateId = 0;
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
-                                       
stateBackend.kvStateInformation.entrySet()) {
+                               stateBackend.kvStateInformation.entrySet()) {
 
                                
metaInfoSnapshots.add(column.getValue().f1.snapshot());
 
@@ -628,7 +629,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                readOptions.setSnapshot(snapshot);
 
                                kvStateIterators.add(
-                                               new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
+                                       new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
 
                                ++kvStateId;
                        }
@@ -797,10 +798,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private StreamStateHandle metaStateHandle = null;
 
                private RocksDBIncrementalSnapshotOperation(
-                               RocksDBKeyedStateBackend<K> stateBackend,
-                               CheckpointStreamFactory checkpointStreamFactory,
-                               long checkpointId,
-                               long checkpointTimestamp) {
+                       RocksDBKeyedStateBackend<K> stateBackend,
+                       CheckpointStreamFactory checkpointStreamFactory,
+                       long checkpointId,
+                       long checkpointTimestamp) {
 
                        this.stateBackend = stateBackend;
                        this.checkpointStreamFactory = checkpointStreamFactory;
@@ -886,20 +887,14 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                void takeSnapshot() throws Exception {
                        assert 
(Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
-                       final long lastCompletedCheckpoint;
-
                        // use the last completed checkpoint as the comparison 
base.
                        synchronized (stateBackend.materializedSstFiles) {
-                               lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
-                               baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
+                               baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
                        }
 
-                       LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
-                               "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
-
                        // save meta data
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
-                                       : 
stateBackend.kvStateInformation.entrySet()) {
+                               : stateBackend.kvStateInformation.entrySet()) {
                                
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
                        }
 
@@ -1054,47 +1049,39 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        }
 
        private void createDB() throws IOException {
-               db = openDB(instanceRocksDBPath.getAbsolutePath(),
-                       new ArrayList<ColumnFamilyDescriptor>(),
-                       null);
+               List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
+               this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
+               this.defaultColumnFamily = columnFamilyHandles.get(0);
        }
 
        private RocksDB openDB(
-                       String path,
-                       List<ColumnFamilyDescriptor> 
stateColumnFamilyDescriptors,
-                       List<ColumnFamilyHandle> stateColumnFamilyHandles) 
throws IOException {
+               String path,
+               List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
+               List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
 
-               List<ColumnFamilyDescriptor> columnFamilyDescriptors = new 
ArrayList<>(stateColumnFamilyDescriptors);
+               List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+                       new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
 
-               // we add the required descriptor for the default CF in last 
position.
-               columnFamilyDescriptors.add(
-                       new ColumnFamilyDescriptor(
-                               
"default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions));
+               columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
 
-               List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(columnFamilyDescriptors.size());
+               // we add the required descriptor for the default CF in last 
position.
+               columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
 
                RocksDB db;
 
                try {
                        db = RocksDB.open(
-                                       Preconditions.checkNotNull(dbOptions),
-                                       Preconditions.checkNotNull(path),
-                                       columnFamilyDescriptors,
-                                       columnFamilyHandles);
+                               Preconditions.checkNotNull(dbOptions),
+                               Preconditions.checkNotNull(path),
+                               columnFamilyDescriptors,
+                               stateColumnFamilyHandles);
                } catch (RocksDBException e) {
                        throw new IOException("Error while opening RocksDB 
instance.", e);
                }
 
-               final int defaultColumnFamilyIndex = columnFamilyHandles.size() 
- 1;
-
-               // extract the default column family.
-               defaultColumnFamily = 
columnFamilyHandles.get(defaultColumnFamilyIndex);
-
-               if (stateColumnFamilyHandles != null) {
-                       // return all CFs except the default CF which is kept 
separately because it is not used in Flink operations.
-                       stateColumnFamilyHandles.addAll(
-                               columnFamilyHandles.subList(0, 
defaultColumnFamilyIndex));
-               }
+               // requested + default CF
+               Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
+                       "Not all requested column family handles have been 
created");
 
                return db;
        }
@@ -1135,7 +1122,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws RocksDBException
                 */
                public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
-                               throws IOException, StateMigrationException, 
ClassNotFoundException, RocksDBException {
+                       throws IOException, StateMigrationException, 
ClassNotFoundException, RocksDBException {
 
                        rocksDBKeyedStateBackend.createDB();
 
@@ -1144,8 +1131,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                        if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
                                                throw new 
IllegalStateException("Unexpected state handle type, " +
-                                                               "expected: " + 
KeyGroupsStateHandle.class +
-                                                               ", but found: " 
+ keyedStateHandle.getClass());
+                                                       "expected: " + 
KeyGroupsStateHandle.class +
+                                                       ", but found: " + 
keyedStateHandle.getClass());
                                        }
                                        this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
                                        restoreKeyGroupsInStateHandle();
@@ -1161,7 +1148,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                 * @throws ClassNotFoundException
                 */
                private void restoreKeyGroupsInStateHandle()
-                               throws IOException, StateMigrationException, 
RocksDBException, ClassNotFoundException {
+                       throws IOException, StateMigrationException, 
RocksDBException, ClassNotFoundException {
                        try {
                                currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
                                
rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
@@ -1186,17 +1173,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
 
                        KeyedBackendSerializationProxy<K> serializationProxy =
-                                       new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+                               new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
                        serializationProxy.read(currentStateHandleInView);
 
                        // check for key serializer compatibility; this also 
reconfigures the
                        // key serializer to be compatible, if it is required 
and is possible
                        if (CompatibilityUtil.resolveCompatibilityResult(
-                                       serializationProxy.getKeySerializer(),
-                                       UnloadableDummyTypeSerializer.class,
-                                       
serializationProxy.getKeySerializerConfigSnapshot(),
-                                       rocksDBKeyedStateBackend.keySerializer)
+                               serializationProxy.getKeySerializer(),
+                               UnloadableDummyTypeSerializer.class,
+                               
serializationProxy.getKeySerializerConfigSnapshot(),
+                               rocksDBKeyedStateBackend.keySerializer)
                                .isRequiresMigration()) {
 
                                // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
@@ -1208,7 +1195,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
 
                        List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
-                                       
serializationProxy.getStateMetaInfoSnapshots();
+                               serializationProxy.getStateMetaInfoSnapshots();
                        currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
                        //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
 
@@ -1218,22 +1205,24 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
 
                                if (registeredColumn == null) {
+                                       byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+
                                        ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
-                                               
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+                                               nameBytes,
                                                
rocksDBKeyedStateBackend.columnOptions);
 
                                        RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
-                                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                                               
restoredMetaInfo.getStateType(),
-                                                               
restoredMetaInfo.getName(),
-                                                               
restoredMetaInfo.getNamespaceSerializer(),
-                                                               
restoredMetaInfo.getStateSerializer());
+                                               new 
RegisteredKeyedBackendStateMetaInfo<>(
+                                                       
restoredMetaInfo.getStateType(),
+                                                       
restoredMetaInfo.getName(),
+                                                       
restoredMetaInfo.getNamespaceSerializer(),
+                                                       
restoredMetaInfo.getStateSerializer());
 
                                        
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
 
                                        ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-                                       registeredColumn = new 
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, 
?>>(columnFamily, stateMetaInfo);
+                                       registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
                                        
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
 
                                } else {
@@ -1303,7 +1292,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
-                               StreamStateHandle metaStateHandle) throws 
Exception {
+                       StreamStateHandle metaStateHandle) throws Exception {
 
                        FSDataInputStream inputStream = null;
 
@@ -1319,10 +1308,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                // check for key serializer compatibility; this 
also reconfigures the
                                // key serializer to be compatible, if it is 
required and is possible
                                if 
(CompatibilityUtil.resolveCompatibilityResult(
-                                               
serializationProxy.getKeySerializer(),
-                                               
UnloadableDummyTypeSerializer.class,
-                                               
serializationProxy.getKeySerializerConfigSnapshot(),
-                                               stateBackend.keySerializer)
+                                       serializationProxy.getKeySerializer(),
+                                       UnloadableDummyTypeSerializer.class,
+                                       
serializationProxy.getKeySerializerConfigSnapshot(),
+                                       stateBackend.keySerializer)
                                        .isRequiresMigration()) {
 
                                        // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
@@ -1340,8 +1329,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                private void readStateData(
-                               Path restoreFilePath,
-                               StreamStateHandle remoteFileHandle) throws 
IOException {
+                       Path restoreFilePath,
+                       StreamStateHandle remoteFileHandle) throws IOException {
 
                        FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
 
@@ -1378,8 +1367,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                }
 
                private void restoreInstance(
-                               IncrementalKeyedStateHandle restoreStateHandle,
-                               boolean hasExtraKeys) throws Exception {
+                       IncrementalKeyedStateHandle restoreStateHandle,
+                       boolean hasExtraKeys) throws Exception {
 
                        // read state data
                        Path restoreInstancePath = new Path(
@@ -1399,7 +1388,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
                                        
readMetaData(restoreStateHandle.getMetaStateHandle());
 
-                               List<ColumnFamilyDescriptor> 
columnFamilyDescriptors = new ArrayList<>();
+                               List<ColumnFamilyDescriptor> 
columnFamilyDescriptors =
+                                       new ArrayList<>(1 + 
stateMetaInfoSnapshots.size());
 
                                for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
 
@@ -1413,69 +1403,78 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                if (hasExtraKeys) {
 
-                                       List<ColumnFamilyHandle> 
columnFamilyHandles = new ArrayList<>();
+                                       List<ColumnFamilyHandle> 
columnFamilyHandles =
+                                               new ArrayList<>(1 + 
columnFamilyDescriptors.size());
 
                                        try (RocksDB restoreDb = 
stateBackend.openDB(
-                                                       
restoreInstancePath.getPath(),
-                                                       columnFamilyDescriptors,
-                                                       columnFamilyHandles)) {
-
-                                               for (int i = 0; i < 
columnFamilyHandles.size(); ++i) {
-                                                       ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
-                                                       ColumnFamilyDescriptor 
columnFamilyDescriptor = columnFamilyDescriptors.get(i);
-                                                       
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
-
-                                                       
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> 
registeredStateMetaInfoEntry =
-                                                               
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
-
-                                                       if (null == 
registeredStateMetaInfoEntry) {
-
-                                                               
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-                                                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                                                               
stateMetaInfoSnapshot.getStateType(),
+                                               restoreInstancePath.getPath(),
+                                               columnFamilyDescriptors,
+                                               columnFamilyHandles)) {
+
+                                               try {
+                                                       // iterating only the 
requested descriptors automatically skips the default column family handle
+                                                       for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
+                                                               
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
+                                                               
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
+                                                               
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
+
+                                                               
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> 
registeredStateMetaInfoEntry =
+                                                                       
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
+
+                                                               if (null == 
registeredStateMetaInfoEntry) {
+
+                                                                       
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+                                                                               
new RegisteredKeyedBackendStateMetaInfo<>(
+                                                                               
        stateMetaInfoSnapshot.getStateType(),
+                                                                               
        stateMetaInfoSnapshot.getName(),
+                                                                               
        stateMetaInfoSnapshot.getNamespaceSerializer(),
+                                                                               
        stateMetaInfoSnapshot.getStateSerializer());
+
+                                                                       
registeredStateMetaInfoEntry =
+                                                                               
new Tuple2<>(
+                                                                               
        stateBackend.db.createColumnFamily(columnFamilyDescriptor),
+                                                                               
        stateMetaInfo);
+
+                                                                       
stateBackend.kvStateInformation.put(
                                                                                
stateMetaInfoSnapshot.getName(),
-                                                                               
stateMetaInfoSnapshot.getNamespaceSerializer(),
-                                                                               
stateMetaInfoSnapshot.getStateSerializer());
-
-                                                               
registeredStateMetaInfoEntry =
-                                                                       new 
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
-                                                                               
stateBackend.db.createColumnFamily(columnFamilyDescriptor),
-                                                                               
stateMetaInfo);
+                                                                               
registeredStateMetaInfoEntry);
+                                                               }
 
-                                                               
stateBackend.kvStateInformation.put(
-                                                                       
stateMetaInfoSnapshot.getName(),
-                                                                       
registeredStateMetaInfoEntry);
-                                                       }
+                                                               
ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
 
-                                                       ColumnFamilyHandle 
targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
+                                                               try 
(RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
 
-                                                       try (RocksIterator 
iterator = restoreDb.newIterator(columnFamilyHandle)) {
+                                                                       int 
startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
+                                                                       byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
+                                                                       for 
(int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
+                                                                               
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+                                                                       }
 
-                                                               int 
startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
-                                                               byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
-                                                               for (int j = 0; 
j < stateBackend.keyGroupPrefixBytes; ++j) {
-                                                                       
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
-                                                               }
+                                                                       
iterator.seek(startKeyGroupPrefixBytes);
 
-                                                               
iterator.seek(startKeyGroupPrefixBytes);
+                                                                       while 
(iterator.isValid()) {
 
-                                                               while 
(iterator.isValid()) {
+                                                                               
int keyGroup = 0;
+                                                                               
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
+                                                                               
        keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
+                                                                               
}
 
-                                                                       int 
keyGroup = 0;
-                                                                       for 
(int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-                                                                               
keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
-                                                                       }
+                                                                               
if (stateBackend.keyGroupRange.contains(keyGroup)) {
+                                                                               
        stateBackend.db.put(targetColumnFamilyHandle,
+                                                                               
                iterator.key(), iterator.value());
+                                                                               
}
 
-                                                                       if 
(stateBackend.keyGroupRange.contains(keyGroup)) {
-                                                                               
stateBackend.db.put(targetColumnFamilyHandle,
-                                                                               
        iterator.key(), iterator.value());
+                                                                               
iterator.next();
                                                                        }
-
-                                                                       
iterator.next();
-                                                               }
+                                                               } // releases 
native iterator resources
+                                                       }
+                                               } finally {
+                                                       //release native tmp db 
column family resources
+                                                       for (ColumnFamilyHandle 
columnFamilyHandle : columnFamilyHandles) {
+                                                               
IOUtils.closeQuietly(columnFamilyHandle);
                                                        }
                                                }
-                                       }
+                                       } // releases native tmp db resources
                                } else {
                                        // pick up again the old backend id, so 
the we can reference existing state
                                        stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
@@ -1491,11 +1490,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        
createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
                                        
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
 
-                                       List<ColumnFamilyHandle> 
columnFamilyHandles = new ArrayList<>();
+                                       List<ColumnFamilyHandle> 
columnFamilyHandles =
+                                               new ArrayList<>(1 + 
columnFamilyDescriptors.size());
+
                                        stateBackend.db = stateBackend.openDB(
                                                
stateBackend.instanceRocksDBPath.getAbsolutePath(),
                                                columnFamilyDescriptors, 
columnFamilyHandles);
 
+                                       // extract and store the default column 
family which is located at the last index
+                                       stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
+
                                        for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
                                                
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
 
@@ -1509,8 +1513,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                                                
stateBackend.kvStateInformation.put(
                                                        
stateMetaInfoSnapshot.getName(),
-                                                       new 
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
-                                                               
columnFamilyHandle, stateMetaInfo));
+                                                       new 
Tuple2<>(columnFamilyHandle, stateMetaInfo));
                                        }
 
                                        // use the restore sst files as the 
base for succeeding checkpoints
@@ -1590,10 +1593,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         */
        @SuppressWarnings("rawtypes, unchecked")
        protected <N, S> ColumnFamilyHandle getColumnFamily(
-                       StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
+               StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
 
                Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
-                               kvStateInformation.get(descriptor.getName());
+                       kvStateInformation.get(descriptor.getName());
 
                RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
                        descriptor.getType(),
@@ -1625,16 +1628,16 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        // check compatibility results to determine if state 
migration is required
                        CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       
restoredMetaInfo.getNamespaceSerializer(),
-                                       MigrationNamespaceSerializerProxy.class,
-                                       
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-                                       newMetaInfo.getNamespaceSerializer());
+                               restoredMetaInfo.getNamespaceSerializer(),
+                               MigrationNamespaceSerializerProxy.class,
+                               
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+                               newMetaInfo.getNamespaceSerializer());
 
                        CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                                       restoredMetaInfo.getStateSerializer(),
-                                       UnloadableDummyTypeSerializer.class,
-                                       
restoredMetaInfo.getStateSerializerConfigSnapshot(),
-                                       newMetaInfo.getStateSerializer());
+                               restoredMetaInfo.getStateSerializer(),
+                               UnloadableDummyTypeSerializer.class,
+                               
restoredMetaInfo.getStateSerializerConfigSnapshot(),
+                               newMetaInfo.getStateSerializer());
 
                        if (!namespaceCompatibility.isRequiresMigration() && 
!stateCompatibility.isRequiresMigration()) {
                                stateInfo.f1 = newMetaInfo;
@@ -1645,25 +1648,31 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
                }
 
-               ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(
-                               
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions);
+               byte[] nameBytes = 
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+               
Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, 
nameBytes),
+                       "The chosen state name 'default' collides with the name 
of the default column family!");
+
+               ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);
+
+               final ColumnFamilyHandle columnFamily;
 
                try {
-                       ColumnFamilyHandle columnFamily = 
db.createColumnFamily(columnDescriptor);
-                       Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
-                                       new Tuple2<>(columnFamily, newMetaInfo);
-                       Map rawAccess = kvStateInformation;
-                       rawAccess.put(descriptor.getName(), tuple);
-                       return columnFamily;
+                       columnFamily = db.createColumnFamily(columnDescriptor);
                } catch (RocksDBException e) {
                        throw new IOException("Error creating 
ColumnFamilyHandle.", e);
                }
+
+               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
+                       new Tuple2<>(columnFamily, newMetaInfo);
+               Map rawAccess = kvStateInformation;
+               rawAccess.put(descriptor.getName(), tuple);
+               return columnFamily;
        }
 
        @Override
        protected <N, T> InternalValueState<N, T> createValueState(
-                       TypeSerializer<N> namespaceSerializer,
-                       ValueStateDescriptor<T> stateDesc) throws Exception {
+               TypeSerializer<N> namespaceSerializer,
+               ValueStateDescriptor<T> stateDesc) throws Exception {
 
                ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
 
@@ -1672,8 +1681,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        @Override
        protected <N, T> InternalListState<N, T> createListState(
-                       TypeSerializer<N> namespaceSerializer,
-                       ListStateDescriptor<T> stateDesc) throws Exception {
+               TypeSerializer<N> namespaceSerializer,
+               ListStateDescriptor<T> stateDesc) throws Exception {
 
                ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
 
@@ -1682,8 +1691,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        @Override
        protected <N, T> InternalReducingState<N, T> createReducingState(
-                       TypeSerializer<N> namespaceSerializer,
-                       ReducingStateDescriptor<T> stateDesc) throws Exception {
+               TypeSerializer<N> namespaceSerializer,
+               ReducingStateDescriptor<T> stateDesc) throws Exception {
 
                ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
 
@@ -1692,8 +1701,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        @Override
        protected <N, T, ACC, R> InternalAggregatingState<N, T, R> 
createAggregatingState(
-                       TypeSerializer<N> namespaceSerializer,
-                       AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
+               TypeSerializer<N> namespaceSerializer,
+               AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
 
                ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
                return new RocksDBAggregatingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
@@ -1701,8 +1710,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        @Override
        protected <N, T, ACC> InternalFoldingState<N, T, ACC> 
createFoldingState(
-                       TypeSerializer<N> namespaceSerializer,
-                       FoldingStateDescriptor<T, ACC> stateDesc) throws 
Exception {
+               TypeSerializer<N> namespaceSerializer,
+               FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
                ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
 
@@ -1711,7 +1720,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
        @Override
        protected <N, UK, UV> InternalMapState<N, UK, UV> 
createMapState(TypeSerializer<N> namespaceSerializer,
-                       MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+                                                                        
MapStateDescriptor<UK, UV> stateDesc) throws Exception {
                ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
 
                return new RocksDBMapState<>(columnFamily, namespaceSerializer, 
stateDesc, this);
@@ -1784,7 +1793,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                        @Override
                                        public int compare(MergeIterator o1, 
MergeIterator o2) {
                                                int arrayCmpRes = 
compareKeyGroupsForByteArrays(
-                                                               o1.currentKey, 
o2.currentKey, currentBytes);
+                                                       o1.currentKey, 
o2.currentKey, currentBytes);
                                                return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
                                        }
                                });
@@ -1799,7 +1808,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
 
                        if (kvStateIterators.size() > 0) {
                                PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
-                                               new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+                                       new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
 
                                for (Tuple2<RocksIterator, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
                                        final RocksIterator rocksIterator = 
rocksIteratorWithKVStateId.f0;
@@ -1968,8 +1977,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                KeyedStateHandle keyedStateHandle = 
restoreState.iterator().next();
                if (!(keyedStateHandle instanceof 
MigrationKeyGroupStateHandle)) {
                        throw new IllegalStateException("Unexpected state 
handle type, " +
-                                       "expected: " + 
MigrationKeyGroupStateHandle.class +
-                                       ", but found: " + 
keyedStateHandle.getClass());
+                               "expected: " + 
MigrationKeyGroupStateHandle.class +
+                               ", but found: " + keyedStateHandle.getClass());
                }
 
                MigrationKeyGroupStateHandle keyGroupStateHandle = 
(MigrationKeyGroupStateHandle) keyedStateHandle;
@@ -1989,8 +1998,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        byte mappingByte = inputView.readByte();
 
                        ObjectInputStream ooIn =
-                                       new 
InstantiationUtil.ClassLoaderObjectInputStream(
-                                                       new 
DataInputViewStream(inputView), userCodeClassLoader);
+                               new 
InstantiationUtil.ClassLoaderObjectInputStream(
+                                       new DataInputViewStream(inputView), 
userCodeClassLoader);
 
                        StateDescriptor<?, ?> stateDescriptor = 
(StateDescriptor<?, ?>) ooIn.readObject();
 
@@ -2015,7 +2024,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        while (true) {
                                byte mappingByte = inputView.readByte();
                                ColumnFamilyHandle handle = getColumnFamily(
-                                               
columnFamilyMapping.get(mappingByte), 
MigrationNamespaceSerializerProxy.INSTANCE);
+                                       columnFamilyMapping.get(mappingByte), 
MigrationNamespaceSerializerProxy.INSTANCE);
 
                                byte[] keyAndNamespace = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 991e0d4..08d661c 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -42,7 +42,9 @@ import 
org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
+import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -122,6 +124,22 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
                return backend;
        }
 
+       // small safety net for instance cleanups, so that no native objects 
are left
+       @After
+       public void cleanupRocksDB() {
+               if (keyedStateBackend != null) {
+                       IOUtils.closeQuietly(keyedStateBackend);
+                       keyedStateBackend.dispose();
+               }
+
+               if (allCreatedCloseables != null) {
+                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
+                               verify(rocksCloseable, times(1)).close();
+                       }
+                       allCreatedCloseables = null;
+               }
+       }
+
        public void setupRocksKeyedStateBackend() throws Exception {
 
                blocker = new OneShotLatch();
@@ -238,149 +256,186 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
 
        @Test
        public void testCorrectMergeOperatorSet() throws IOException {
-               ColumnFamilyOptions columnFamilyOptions = 
mock(ColumnFamilyOptions.class);
-
-               try (RocksDBKeyedStateBackend<Integer> test = new 
RocksDBKeyedStateBackend<>(
-                       "test",
-                       Thread.currentThread().getContextClassLoader(),
-                       tempFolder.newFolder(),
-                       mock(DBOptions.class),
-                       columnFamilyOptions,
-                       mock(TaskKvStateRegistry.class),
-                       IntSerializer.INSTANCE,
-                       1,
-                       new KeyGroupRange(0, 0),
-                       new ExecutionConfig(),
-                       enableIncrementalCheckpointing)) {
+
+               final ColumnFamilyOptions columnFamilyOptions = spy(new 
ColumnFamilyOptions());
+               RocksDBKeyedStateBackend<Integer> test = null;
+               try {
+                       test = new RocksDBKeyedStateBackend<>(
+                               "test",
+                               Thread.currentThread().getContextClassLoader(),
+                               tempFolder.newFolder(),
+                               mock(DBOptions.class),
+                               columnFamilyOptions,
+                               mock(TaskKvStateRegistry.class),
+                               IntSerializer.INSTANCE,
+                               1,
+                               new KeyGroupRange(0, 0),
+                               new ExecutionConfig(),
+                               enableIncrementalCheckpointing);
 
                        verify(columnFamilyOptions, Mockito.times(1))
                                
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+               } finally {
+                       if (test != null) {
+                               IOUtils.closeQuietly(test);
+                               test.dispose();
+                       }
+                       columnFamilyOptions.close();
                }
        }
 
        @Test
        public void testReleasingSnapshotAfterBackendClosed() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
-                       CheckpointOptions.forFullCheckpoint());
 
-               RocksDB spyDB = keyedStateBackend.db;
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+                               CheckpointOptions.forFullCheckpoint());
 
-               if (!enableIncrementalCheckpointing) {
-                       verify(spyDB, times(1)).getSnapshot();
-                       verify(spyDB, 
times(0)).releaseSnapshot(any(Snapshot.class));
-               }
+                       RocksDB spyDB = keyedStateBackend.db;
 
-               this.keyedStateBackend.dispose();
-               verify(spyDB, times(1)).close();
-               assertEquals(null, keyedStateBackend.db);
+                       if (!enableIncrementalCheckpointing) {
+                               verify(spyDB, times(1)).getSnapshot();
+                               verify(spyDB, 
times(0)).releaseSnapshot(any(Snapshot.class));
+                       }
 
-               //Ensure every RocksObjects not closed yet
-               for (RocksObject rocksCloseable : allCreatedCloseables) {
-                       verify(rocksCloseable, times(0)).close();
-               }
+                       this.keyedStateBackend.dispose();
+                       verify(spyDB, times(1)).close();
+                       assertEquals(null, keyedStateBackend.db);
 
-               snapshot.cancel(true);
+                       //Ensure every RocksObjects not closed yet
+                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
+                               verify(rocksCloseable, times(0)).close();
+                       }
 
-               //Ensure every RocksObjects was closed exactly once
-               for (RocksObject rocksCloseable : allCreatedCloseables) {
-                       verify(rocksCloseable, times(1)).close();
-               }
+                       snapshot.cancel(true);
 
+                       //Ensure every RocksObjects was closed exactly once
+                       for (RocksObject rocksCloseable : allCreatedCloseables) 
{
+                               verify(rocksCloseable, times(1)).close();
+                       }
+               } finally {
+                       keyedStateBackend.dispose();
+                       keyedStateBackend = null;
+               }
        }
 
        @Test
        public void testDismissingSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
-               snapshot.cancel(true);
-               verifyRocksObjectsReleased();
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forFullCheckpoint());
+                       snapshot.cancel(true);
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
+               }
        }
 
        @Test
        public void testDismissingSnapshotNotRunnable() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
-               snapshot.cancel(true);
-               Thread asyncSnapshotThread = new Thread(snapshot);
-               asyncSnapshotThread.start();
                try {
-                       snapshot.get();
-                       fail();
-               } catch (Exception ignored) {
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forFullCheckpoint());
+                       snapshot.cancel(true);
+                       Thread asyncSnapshotThread = new Thread(snapshot);
+                       asyncSnapshotThread.start();
+                       try {
+                               snapshot.get();
+                               fail();
+                       } catch (Exception ignored) {
 
+                       }
+                       asyncSnapshotThread.join();
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
                }
-               asyncSnapshotThread.join();
-               verifyRocksObjectsReleased();
        }
 
        @Test
        public void testCompletingSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
-               Thread asyncSnapshotThread = new Thread(snapshot);
-               asyncSnapshotThread.start();
-               waiter.await(); // wait for snapshot to run
-               waiter.reset();
-               runStateUpdates();
-               blocker.trigger(); // allow checkpointing to start writing
-               waiter.await(); // wait for snapshot stream writing to run
-               KeyedStateHandle keyedStateHandle = snapshot.get();
-               assertNotNull(keyedStateHandle);
-               assertTrue(keyedStateHandle.getStateSize() > 0);
-               assertEquals(2, 
keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-               assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
-               asyncSnapshotThread.join();
-               verifyRocksObjectsReleased();
+               try {
+                       RunnableFuture<KeyedStateHandle> snapshot =
+                               keyedStateBackend.snapshot(0L, 0L, 
testStreamFactory, CheckpointOptions.forFullCheckpoint());
+                       Thread asyncSnapshotThread = new Thread(snapshot);
+                       asyncSnapshotThread.start();
+                       waiter.await(); // wait for snapshot to run
+                       waiter.reset();
+                       runStateUpdates();
+                       blocker.trigger(); // allow checkpointing to start 
writing
+                       waiter.await(); // wait for snapshot stream writing to 
run
+                       KeyedStateHandle keyedStateHandle = snapshot.get();
+                       assertNotNull(keyedStateHandle);
+                       assertTrue(keyedStateHandle.getStateSize() > 0);
+                       assertEquals(2, 
keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+                       
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+                       asyncSnapshotThread.join();
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
+               }
        }
 
        @Test
        public void testCancelRunningSnapshot() throws Exception {
                setupRocksKeyedStateBackend();
-               RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
-               Thread asyncSnapshotThread = new Thread(snapshot);
-               asyncSnapshotThread.start();
-               waiter.await(); // wait for snapshot to run
-               waiter.reset();
-               runStateUpdates();
-               snapshot.cancel(true);
-               blocker.trigger(); // allow checkpointing to start writing
-               assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
-               waiter.await(); // wait for snapshot stream writing to run
                try {
-                       snapshot.get();
-                       fail();
-               } catch (Exception ignored) {
-               }
+                       RunnableFuture<KeyedStateHandle> snapshot = 
keyedStateBackend.snapshot(0L, 0L, testStreamFactory, 
CheckpointOptions.forFullCheckpoint());
+                       Thread asyncSnapshotThread = new Thread(snapshot);
+                       asyncSnapshotThread.start();
+                       waiter.await(); // wait for snapshot to run
+                       waiter.reset();
+                       runStateUpdates();
+                       snapshot.cancel(true);
+                       blocker.trigger(); // allow checkpointing to start 
writing
+                       
assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+                       waiter.await(); // wait for snapshot stream writing to 
run
+                       try {
+                               snapshot.get();
+                               fail();
+                       } catch (Exception ignored) {
+                       }
 
-               asyncSnapshotThread.join();
-               verifyRocksObjectsReleased();
+                       asyncSnapshotThread.join();
+                       verifyRocksObjectsReleased();
+               } finally {
+                       this.keyedStateBackend.dispose();
+                       this.keyedStateBackend = null;
+               }
        }
 
        @Test
        public void testDisposeDeletesAllDirectories() throws Exception {
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
-               ValueStateDescriptor<String> kvId =
+               Collection<File> allFilesInDbDir =
+                       FileUtils.listFilesAndDirs(new File(dbPath), new 
AcceptAllFilter(), new AcceptAllFilter());
+               try {
+                       ValueStateDescriptor<String> kvId =
                                new ValueStateDescriptor<>("id", String.class, 
null);
 
-               kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
-               ValueState<String> state =
+                       ValueState<String> state =
                                
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-               backend.setCurrentKey(1);
-               state.update("Hello");
-
-               Collection<File> allFilesInDbDir =
-                               FileUtils.listFilesAndDirs(new File(dbPath), 
new AcceptAllFilter(), new AcceptAllFilter());
-
-               // more than just the root directory
-               assertTrue(allFilesInDbDir.size() > 1);
-
-               backend.dispose();
+                       backend.setCurrentKey(1);
+                       state.update("Hello");
 
+                       // more than just the root directory
+                       assertTrue(allFilesInDbDir.size() > 1);
+               } finally {
+                       IOUtils.closeQuietly(backend);
+                       backend.dispose();
+               }
                allFilesInDbDir =
-                               FileUtils.listFilesAndDirs(new File(dbPath), 
new AcceptAllFilter(), new AcceptAllFilter());
+                       FileUtils.listFilesAndDirs(new File(dbPath), new 
AcceptAllFilter(), new AcceptAllFilter());
 
                // just the root directory left
                assertEquals(1, allFilesInDbDir.size());
@@ -390,62 +445,64 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase<RocksDBStateBa
        public void testSharedIncrementalStateDeRegistration() throws Exception 
{
                if (enableIncrementalCheckpointing) {
                        AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
-                       ValueStateDescriptor<String> kvId =
-                               new ValueStateDescriptor<>("id", String.class, 
null);
+                       try {
+                               ValueStateDescriptor<String> kvId =
+                                       new ValueStateDescriptor<>("id", 
String.class, null);
 
-                       kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
+                               kvId.initializeSerializerUnlessSet(new 
ExecutionConfig());
 
-                       ValueState<String> state =
-                               
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                               ValueState<String> state =
+                                       
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-                       Queue<IncrementalKeyedStateHandle> previousStateHandles 
= new LinkedList<>();
-                       SharedStateRegistry sharedStateRegistry = spy(new 
SharedStateRegistry());
-                       for (int checkpointId = 0; checkpointId < 3; 
++checkpointId) {
+                               Queue<IncrementalKeyedStateHandle> 
previousStateHandles = new LinkedList<>();
+                               SharedStateRegistry sharedStateRegistry = 
spy(new SharedStateRegistry());
+                               for (int checkpointId = 0; checkpointId < 3; 
++checkpointId) {
 
-                               reset(sharedStateRegistry);
+                                       reset(sharedStateRegistry);
 
-                               backend.setCurrentKey(checkpointId);
-                               state.update("Hello-" + checkpointId);
+                                       backend.setCurrentKey(checkpointId);
+                                       state.update("Hello-" + checkpointId);
 
-                               RunnableFuture<KeyedStateHandle> snapshot = 
backend.snapshot(
-                                       checkpointId,
-                                       checkpointId,
-                                       createStreamFactory(),
-                                       CheckpointOptions.forFullCheckpoint());
+                                       RunnableFuture<KeyedStateHandle> 
snapshot = backend.snapshot(
+                                               checkpointId,
+                                               checkpointId,
+                                               createStreamFactory(),
+                                               
CheckpointOptions.forFullCheckpoint());
 
-                               snapshot.run();
+                                       snapshot.run();
 
-                               IncrementalKeyedStateHandle stateHandle = 
(IncrementalKeyedStateHandle) snapshot.get();
-                               Map<StateHandleID, StreamStateHandle> 
sharedState =
-                                       new 
HashMap<>(stateHandle.getSharedState());
+                                       IncrementalKeyedStateHandle stateHandle 
= (IncrementalKeyedStateHandle) snapshot.get();
+                                       Map<StateHandleID, StreamStateHandle> 
sharedState =
+                                               new 
HashMap<>(stateHandle.getSharedState());
 
-                               
stateHandle.registerSharedStates(sharedStateRegistry);
+                                       
stateHandle.registerSharedStates(sharedStateRegistry);
 
-                               for (Map.Entry<StateHandleID, 
StreamStateHandle> e : sharedState.entrySet()) {
-                                       
verify(sharedStateRegistry).registerReference(
-                                               
stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
-                                               e.getValue());
-                               }
+                                       for (Map.Entry<StateHandleID, 
StreamStateHandle> e : sharedState.entrySet()) {
+                                               
verify(sharedStateRegistry).registerReference(
+                                                       
stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
+                                                       e.getValue());
+                                       }
 
-                               previousStateHandles.add(stateHandle);
-                               backend.notifyCheckpointComplete(checkpointId);
+                                       previousStateHandles.add(stateHandle);
+                                       
backend.notifyCheckpointComplete(checkpointId);
 
-                               
//-----------------------------------------------------------------
+                                       
//-----------------------------------------------------------------
 
-                               if (previousStateHandles.size() > 1) {
-                                       
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+                                       if (previousStateHandles.size() > 1) {
+                                               
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+                                       }
                                }
-                       }
 
-                       while (!previousStateHandles.isEmpty()) {
+                               while (!previousStateHandles.isEmpty()) {
 
-                               reset(sharedStateRegistry);
+                                       reset(sharedStateRegistry);
 
-                               checkRemove(previousStateHandles.remove(), 
sharedStateRegistry);
+                                       
checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+                               }
+                       } finally {
+                               IOUtils.closeQuietly(backend);
+                               backend.dispose();
                        }
-
-                       backend.close();
-                       backend.dispose();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 6debff7..f6f73f2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -64,14 +60,17 @@ import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -177,17 +176,15 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        Environment env) throws Exception {
 
                AbstractKeyedStateBackend<K> backend = 
getStateBackend().createKeyedStateBackend(
-                               env,
-                               new JobID(),
-                               "test_op",
-                               keySerializer,
-                               numberOfKeyGroups,
-                               keyGroupRange,
-                               env.getTaskKvStateRegistry());
+                       env,
+                       new JobID(),
+                       "test_op",
+                       keySerializer,
+                       numberOfKeyGroups,
+                       keyGroupRange,
+                       env.getTaskKvStateRegistry());
 
-               if (null != state) {
-                       backend.restore(state);
-               }
+               backend.restore(state);
 
                return backend;
        }
@@ -244,6 +241,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
 
                assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+               backend.dispose();
        }
 
        @Test
@@ -303,6 +301,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
 
                assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+               backend.dispose();
        }
 
        @Test
@@ -356,6 +355,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
 
                assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+               backend.dispose();
        }
 
        @Test
@@ -411,6 +411,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
 
                assertEquals("Didn't see the expected Kryo exception.", 1, 
numExceptions);
+               backend.dispose();
        }
 
 
@@ -488,81 +489,91 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                Environment env = new DummyEnvironment("test", 1, 0);
-               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+               AbstractKeyedStateBackend<Integer> backend = null;
 
-               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+               try {
+                       backend = createKeyedBackend(IntSerializer.INSTANCE, 
env);
 
-               // make sure that we are in fact using the KryoSerializer
-               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+                       TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
 
-               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+                       // make sure that we are in fact using the 
KryoSerializer
+                       
assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof 
KryoSerializer);
 
-               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
 
-               // ============== create snapshot - no Kryo registration or 
specific / default serializers ==============
+                       ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-               // make some more modifications
-               backend.setCurrentKey(1);
-               state.update(new TestPojo("u1", 1));
+                       // ============== create snapshot - no Kryo 
registration or specific / default serializers ==============
 
-               backend.setCurrentKey(2);
-               state.update(new TestPojo("u2", 2));
+                       // make some more modifications
+                       backend.setCurrentKey(1);
+                       state.update(new TestPojo("u1", 1));
 
-               KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+                       backend.setCurrentKey(2);
+                       state.update(new TestPojo("u2", 2));
+
+                       KeyedStateHandle snapshot = 
runSnapshot(backend.snapshot(
                                682375462378L,
                                2,
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
-               snapshot.registerSharedStates(sharedStateRegistry);
-               backend.dispose();
+                       snapshot.registerSharedStates(sharedStateRegistry);
+                       backend.dispose();
 
-               // ========== restore snapshot - should use default serializer 
(ONLY SERIALIZATION) ==========
+                       // ========== restore snapshot - should use default 
serializer (ONLY SERIALIZATION) ==========
 
-               // cast because our test serializer is not typed to TestPojo
-               
env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
+                       // cast because our test serializer is not typed to 
TestPojo
+                       
env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
 
-               backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot, env);
 
-               // re-initialize to ensure that we create the KryoSerializer 
from scratch, otherwise
-               // initializeSerializerUnlessSet would not pick up our new 
config
-               kvId = new ValueStateDescriptor<>("id", pojoType);
-               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       // re-initialize to ensure that we create the 
KryoSerializer from scratch, otherwise
+                       // initializeSerializerUnlessSet would not pick up our 
new config
+                       kvId = new ValueStateDescriptor<>("id", pojoType);
+                       state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-               backend.setCurrentKey(1);
+                       backend.setCurrentKey(1);
 
-               // update to test state backends that eagerly serialize, such 
as RocksDB
-               state.update(new TestPojo("u1", 11));
+                       // update to test state backends that eagerly 
serialize, such as RocksDB
+                       state.update(new TestPojo("u1", 11));
 
-               KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
+                       KeyedStateHandle snapshot2 = 
runSnapshot(backend.snapshot(
                                682375462378L,
                                2,
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
-               snapshot2.registerSharedStates(sharedStateRegistry);
+                       snapshot2.registerSharedStates(sharedStateRegistry);
+                       snapshot.discardState();
 
-               snapshot.discardState();
+                       backend.dispose();
 
-               backend.dispose();
+                       // ========= restore snapshot - should use default 
serializer (FAIL ON DESERIALIZATION) =========
 
-               // ========= restore snapshot - should use default serializer 
(FAIL ON DESERIALIZATION) =========
+                       // cast because our test serializer is not typed to 
TestPojo
+                       
env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
 
-               // cast because our test serializer is not typed to TestPojo
-               
env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) 
CustomKryoTestSerializer.class);
+                       // on the second restore, since the custom serializer 
will be used for
+                       // deserialization, we expect the deliberate failure to 
be thrown
+                       
expectedException.expect(ExpectedKryoTestException.class);
 
-               // on the second restore, since the custom serializer will be 
used for
-               // deserialization, we expect the deliberate failure to be 
thrown
-               expectedException.expect(ExpectedKryoTestException.class);
+                       // state backends that eagerly deserializes (such as 
the memory state backend) will fail here
+                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2, env);
 
-               // state backends that eagerly deserializes (such as the memory 
state backend) will fail here
-               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2, env);
+                       state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       backend.setCurrentKey(1);
+                       // state backends that lazily deserializes (such as 
RocksDB) will fail here
+                       state.value();
 
-               backend.setCurrentKey(1);
-               // state backends that lazily deserializes (such as RocksDB) 
will fail here
-               state.value();
+                       snapshot2.discardState();
+                       backend.dispose();
+               } finally {
+                       // ensure to release native resources even when we exit 
through exception
+                       IOUtils.closeQuietly(backend);
+                       backend.dispose();
+               }
        }
 
        /**
@@ -581,78 +592,89 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
                Environment env = new DummyEnvironment("test", 1, 0);
 
-               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
+               AbstractKeyedStateBackend<Integer> backend = null;
 
-               TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
+               try {
+                       backend = createKeyedBackend(IntSerializer.INSTANCE, 
env);
 
-               // make sure that we are in fact using the KryoSerializer
-               assertTrue(pojoType.createSerializer(env.getExecutionConfig()) 
instanceof KryoSerializer);
+                       TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
 
-               ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
-               ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       // make sure that we are in fact using the 
KryoSerializer
+                       
assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof 
KryoSerializer);
 
-               // ============== create snapshot - no Kryo registration or 
specific / default serializers ==============
+                       ValueStateDescriptor<TestPojo> kvId = new 
ValueStateDescriptor<>("id", pojoType);
+                       ValueState<TestPojo> state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-               // make some more modifications
-               backend.setCurrentKey(1);
-               state.update(new TestPojo("u1", 1));
+                       // ============== create snapshot - no Kryo 
registration or specific / default serializers ==============
 
-               backend.setCurrentKey(2);
-               state.update(new TestPojo("u2", 2));
+                       // make some more modifications
+                       backend.setCurrentKey(1);
+                       state.update(new TestPojo("u1", 1));
 
-               KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+                       backend.setCurrentKey(2);
+                       state.update(new TestPojo("u2", 2));
+
+                       KeyedStateHandle snapshot = 
runSnapshot(backend.snapshot(
                                682375462378L,
                                2,
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
-               snapshot.registerSharedStates(sharedStateRegistry);
-               backend.dispose();
+                       snapshot.registerSharedStates(sharedStateRegistry);
+                       backend.dispose();
 
-               // ========== restore snapshot - should use specific serializer 
(ONLY SERIALIZATION) ==========
+                       // ========== restore snapshot - should use specific 
serializer (ONLY SERIALIZATION) ==========
 
-               
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
+                       
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
 
-               backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, 
env);
+                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot, env);
 
-               // re-initialize to ensure that we create the KryoSerializer 
from scratch, otherwise
-               // initializeSerializerUnlessSet would not pick up our new 
config
-               kvId = new ValueStateDescriptor<>("id", pojoType);
-               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       // re-initialize to ensure that we create the 
KryoSerializer from scratch, otherwise
+                       // initializeSerializerUnlessSet would not pick up our 
new config
+                       kvId = new ValueStateDescriptor<>("id", pojoType);
+                       state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-               backend.setCurrentKey(1);
+                       backend.setCurrentKey(1);
 
-               // update to test state backends that eagerly serialize, such 
as RocksDB
-               state.update(new TestPojo("u1", 11));
+                       // update to test state backends that eagerly 
serialize, such as RocksDB
+                       state.update(new TestPojo("u1", 11));
 
-               KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
+                       KeyedStateHandle snapshot2 = 
runSnapshot(backend.snapshot(
                                682375462378L,
                                2,
                                streamFactory,
                                CheckpointOptions.forFullCheckpoint()));
 
-               snapshot2.registerSharedStates(sharedStateRegistry);
+                       snapshot2.registerSharedStates(sharedStateRegistry);
 
-               snapshot.discardState();
+                       snapshot.discardState();
 
-               backend.dispose();
+                       backend.dispose();
 
-               // ========= restore snapshot - should use specific serializer 
(FAIL ON DESERIALIZATION) =========
+                       // ========= restore snapshot - should use specific 
serializer (FAIL ON DESERIALIZATION) =========
 
-               
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
+                       
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
CustomKryoTestSerializer.class);
 
-               // on the second restore, since the custom serializer will be 
used for
-               // deserialization, we expect the deliberate failure to be 
thrown
-               expectedException.expect(ExpectedKryoTestException.class);
+                       // on the second restore, since the custom serializer 
will be used for
+                       // deserialization, we expect the deliberate failure to 
be thrown
+                       
expectedException.expect(ExpectedKryoTestException.class);
 
-               // state backends that eagerly deserializes (such as the memory 
state backend) will fail here
-               backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2, env);
+                       // state backends that eagerly deserializes (such as 
the memory state backend) will fail here
+                       backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot2, env);
 
-               state = backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+                       state = 
backend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
 
-               backend.setCurrentKey(1);
-               // state backends that lazily deserializes (such as RocksDB) 
will fail here
-               state.value();
+                       backend.setCurrentKey(1);
+                       // state backends that lazily deserializes (such as 
RocksDB) will fail here
+                       state.value();
+
+                       backend.dispose();
+               } finally {
+                       // ensure that native resources are also released in 
case of exception
+                       if (backend != null) {
+                               backend.dispose();
+                       }
+               }
        }
 
        @Test
@@ -1726,7 +1748,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                final int MAX_PARALLELISM = 10;
 
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(
+               final AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(
                                IntSerializer.INSTANCE,
                                MAX_PARALLELISM,
                                new KeyGroupRange(0, MAX_PARALLELISM - 1),
@@ -1770,7 +1792,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                backend.dispose();
 
                // backend for the first half of the key group range
-               AbstractKeyedStateBackend<Integer> firstHalfBackend = 
restoreKeyedBackend(
+               final AbstractKeyedStateBackend<Integer> firstHalfBackend = 
restoreKeyedBackend(
                                IntSerializer.INSTANCE,
                                MAX_PARALLELISM,
                                new KeyGroupRange(0, 4),
@@ -1778,7 +1800,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                new DummyEnvironment("test", 1, 0));
 
                // backend for the second half of the key group range
-               AbstractKeyedStateBackend<Integer> secondHalfBackend = 
restoreKeyedBackend(
+               final AbstractKeyedStateBackend<Integer> secondHalfBackend = 
restoreKeyedBackend(
                                IntSerializer.INSTANCE,
                                MAX_PARALLELISM,
                                new KeyGroupRange(5, 9),
@@ -2017,7 +2039,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
        @Test
        public void testCopyDefaultValue() throws Exception {
-               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+               final AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<IntValue> kvId = new 
ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
 
@@ -2044,7 +2066,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
         */
        @Test
        public void testRequireNonNullNamespace() throws Exception {
-               AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+               final AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
 
                ValueStateDescriptor<IntValue> kvId = new 
ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
 
@@ -2076,7 +2098,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @SuppressWarnings("unchecked")
        protected void testConcurrentMapIfQueryable() throws Exception {
                final int numberOfKeyGroups = 1;
-               AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(
+               final AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(
                                IntSerializer.INSTANCE,
                                numberOfKeyGroups,
                                new KeyGroupRange(0, 0),
@@ -2384,9 +2406,9 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                streamFactory.setBlockerLatch(blocker);
                streamFactory.setAfterNumberInvocations(10);
 
-               AbstractKeyedStateBackend<Integer> backend = null;
+               final AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE);
+
                try {
-                       backend = createKeyedBackend(IntSerializer.INSTANCE);
 
                        if (!backend.supportsAsynchronousSnapshots()) {
                                return;
@@ -2413,14 +2435,11 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        waiter.await();
 
                        // close the backend to see if the close is propagated 
to the stream
-                       backend.close();
+                       IOUtils.closeQuietly(backend);
 
                        //unblock the stream so that it can run into the 
IOException
                        blocker.trigger();
 
-                       //dispose the backend
-                       backend.dispose();
-
                        runner.join();
 
                        try {
@@ -2431,10 +2450,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        }
 
                } finally {
-                       if (null != backend) {
-                               IOUtils.closeQuietly(backend);
-                               backend.dispose();
-                       }
+                       backend.dispose();
                }
        }
 

Reply via email to