http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
deleted file mode 100644
index 5507339..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ /dev/null
@@ -1,2033 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
-import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.DoneFuture;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.ResourceGuard;
-import org.apache.flink.util.StateMigrationException;
-
-import org.rocksdb.Checkpoint;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Snapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- * An {@link AbstractKeyedStateBackend} that stores its state in {@code 
RocksDB} and serializes state to
- * streams provided by a {@link 
org.apache.flink.runtime.state.CheckpointStreamFactory} upon
- * checkpointing. This state backend can store very large state that exceeds 
memory and spills
- * to disk. Except for the snapshotting, this class should be accessed as if 
it is not threadsafe.
- *
- * <p>This class follows the rules for closing/releasing native RocksDB 
resources as described in
- + <a 
href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families";>
- * this document</a>.
- */
-public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
-
-       /** The name of the merge operator in RocksDB. Do not change except you 
know exactly what you do. */
-       public static final String MERGE_OPERATOR_NAME = "stringappendtest";
-
-       /** File suffix of sstable files. */
-       private static final String SST_FILE_SUFFIX = ".sst";
-
-       /** Bytes for the name of the column descriptor for the default column 
family. */
-       public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = 
"default".getBytes(ConfigConstants.DEFAULT_CHARSET);
-
-       /** String that identifies the operator that owns this backend. */
-       private final String operatorIdentifier;
-
-       /** The column family options from the options factory. */
-       private final ColumnFamilyOptions columnOptions;
-
-       /** The DB options from the options factory. */
-       private final DBOptions dbOptions;
-
-       /** Path where this configured instance stores its data directory. */
-       private final File instanceBasePath;
-
-       /** Path where this configured instance stores its RocksDB database. */
-       private final File instanceRocksDBPath;
-
-       /**
-        * Protects access to RocksDB in other threads, like the checkpointing 
thread from parallel call that disposes the
-        * RocksDb object.
-        */
-       private final ResourceGuard rocksDBResourceGuard;
-
-       /**
-        * Our RocksDB database, this is used by the actual subclasses of 
{@link AbstractRocksDBState}
-        * to store state. The different k/v states that we have don't each 
have their own RocksDB
-        * instance. They all write to this instance but to their own column 
family.
-        */
-       protected RocksDB db;
-
-       /**
-        * We are not using the default column family for Flink state ops, but 
we still need to remember this handle so that
-        * we can close it properly when the backend is closed. This is 
required by RocksDB's native memory management.
-        */
-       private ColumnFamilyHandle defaultColumnFamily;
-
-       /**
-        * Information about the k/v states as we create them. This is used to 
retrieve the
-        * column family that is used for a state and also for sanity checks 
when restoring.
-        */
-       private final Map<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
-
-       /**
-        * Map of state names to their corresponding restored state meta info.
-        *
-        * <p>TODO this map can be removed when eager-state registration is in 
place.
-        * TODO we currently need this cached to check state migration 
strategies when new serializers are registered.
-        */
-       private final Map<String, 
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
-
-       /** Number of bytes required to prefix the key groups. */
-       private final int keyGroupPrefixBytes;
-
-       /** True if incremental checkpointing is enabled. */
-       private final boolean enableIncrementalCheckpointing;
-
-       /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints. */
-       private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
-
-       /** The identifier of the last completed checkpoint. */
-       private long lastCompletedCheckpointId = -1L;
-
-       /** Unique ID of this backend. */
-       private UUID backendUID;
-
-       public RocksDBKeyedStateBackend(
-               String operatorIdentifier,
-               ClassLoader userCodeClassLoader,
-               File instanceBasePath,
-               DBOptions dbOptions,
-               ColumnFamilyOptions columnFamilyOptions,
-               TaskKvStateRegistry kvStateRegistry,
-               TypeSerializer<K> keySerializer,
-               int numberOfKeyGroups,
-               KeyGroupRange keyGroupRange,
-               ExecutionConfig executionConfig,
-               boolean enableIncrementalCheckpointing
-       ) throws IOException {
-
-               super(kvStateRegistry, keySerializer, userCodeClassLoader, 
numberOfKeyGroups, keyGroupRange, executionConfig);
-
-               this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
-
-               this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
-               this.rocksDBResourceGuard = new ResourceGuard();
-
-               // ensure that we use the right merge operator, because other 
code relies on this
-               this.columnOptions = 
Preconditions.checkNotNull(columnFamilyOptions)
-                       .setMergeOperatorName(MERGE_OPERATOR_NAME);
-
-               this.dbOptions = Preconditions.checkNotNull(dbOptions);
-
-               this.instanceBasePath = 
Preconditions.checkNotNull(instanceBasePath);
-               this.instanceRocksDBPath = new File(instanceBasePath, "db");
-
-               if (instanceBasePath.exists()) {
-                       // Clear the base directory when the backend is created
-                       // in case something crashed and the backend never 
reached dispose()
-                       cleanInstanceBasePath();
-               }
-
-               if (!instanceBasePath.mkdirs()) {
-                       throw new IOException(
-                                       String.format("Could not create RocksDB 
data directory at %s.", instanceBasePath.getAbsolutePath()));
-               }
-
-               this.keyGroupPrefixBytes = getNumberOfKeyGroups() > 
(Byte.MAX_VALUE + 1) ? 2 : 1;
-               this.kvStateInformation = new HashMap<>();
-               this.restoredKvStateMetaInfos = new HashMap<>();
-               this.materializedSstFiles = new TreeMap<>();
-               this.backendUID = UUID.randomUUID();
-               LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
-       }
-
-       @Override
-       public <N> Stream<K> getKeys(String state, N namespace) {
-               Tuple2<ColumnFamilyHandle, ?> columnInfo = 
kvStateInformation.get(state);
-               if (columnInfo == null) {
-                       return Stream.empty();
-               }
-
-               RocksIterator iterator = db.newIterator(columnInfo.f0);
-               iterator.seekToFirst();
-
-               Iterable<K> iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-               Stream<K> targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-               return targetStream.onClose(iterator::close);
-       }
-
-       /**
-        * Should only be called by one thread, and only after all accesses to 
the DB happened.
-        */
-       @Override
-       public void dispose() {
-               super.dispose();
-
-               // This call will block until all clients that still acquire 
access to the RocksDB instance have released it,
-               // so that we cannot release the native resources while clients 
are still working with it in parallel.
-               rocksDBResourceGuard.close();
-
-               // IMPORTANT: null reference to signal potential async 
checkpoint workers that the db was disposed, as
-               // working on the disposed object results in SEGFAULTS.
-               if (db != null) {
-
-                       // RocksDB's native memory management requires that 
*all* CFs (including default) are closed before the
-                       // DB is closed. So we start with the ones created by 
Flink...
-                       for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
-                               kvStateInformation.values()) {
-                               IOUtils.closeQuietly(columnMetaData.f0);
-                       }
-
-                       // ... close the default CF ...
-                       IOUtils.closeQuietly(defaultColumnFamily);
-
-                       // ... and finally close the DB instance ...
-                       IOUtils.closeQuietly(db);
-
-                       // invalidate the reference before releasing the lock 
so that other accesses will not cause crashes
-                       db = null;
-               }
-
-               kvStateInformation.clear();
-               restoredKvStateMetaInfos.clear();
-
-               IOUtils.closeQuietly(dbOptions);
-               IOUtils.closeQuietly(columnOptions);
-
-               cleanInstanceBasePath();
-       }
-
-       private void cleanInstanceBasePath() {
-               LOG.info("Deleting existing instance base directory {}.", 
instanceBasePath);
-
-               try {
-                       FileUtils.deleteDirectory(instanceBasePath);
-               } catch (IOException ex) {
-                       LOG.warn("Could not delete instance base path for 
RocksDB: " + instanceBasePath, ex);
-               }
-       }
-
-       public int getKeyGroupPrefixBytes() {
-               return keyGroupPrefixBytes;
-       }
-
-       /**
-        * Triggers an asynchronous snapshot of the keyed state backend from 
RocksDB. This snapshot can be canceled and
-        * is also stopped when the backend is closed through {@link 
#dispose()}. For each backend, this method must always
-        * be called by the same thread.
-        *
-        * @param checkpointId  The Id of the checkpoint.
-        * @param timestamp     The timestamp of the checkpoint.
-        * @param streamFactory The factory that we can use for writing our 
state to streams.
-        * @param checkpointOptions Options for how to perform this checkpoint.
-        * @return Future to the state handle of the snapshot data.
-        * @throws Exception
-        */
-       @Override
-       public RunnableFuture<KeyedStateHandle> snapshot(
-               final long checkpointId,
-               final long timestamp,
-               final CheckpointStreamFactory streamFactory,
-               CheckpointOptions checkpointOptions) throws Exception {
-
-               if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-                       enableIncrementalCheckpointing) {
-                       return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-               } else {
-                       return snapshotFully(checkpointId, timestamp, 
streamFactory);
-               }
-       }
-
-       private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
-               final long checkpointId,
-               final long checkpointTimestamp,
-               final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-               if (db == null) {
-                       throw new IOException("RocksDB closed.");
-               }
-
-               if (kvStateInformation.isEmpty()) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-                                               checkpointTimestamp);
-                       }
-                       return DoneFuture.nullValue();
-               }
-
-               final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
-                       new RocksDBIncrementalSnapshotOperation<>(
-                               this,
-                               checkpointStreamFactory,
-                               checkpointId,
-                               checkpointTimestamp);
-
-               try {
-                       snapshotOperation.takeSnapshot();
-               } catch (Exception e) {
-                       snapshotOperation.stop();
-                       snapshotOperation.releaseResources(true);
-                       throw e;
-               }
-
-               return new FutureTask<KeyedStateHandle>(
-                       new Callable<KeyedStateHandle>() {
-                               @Override
-                               public KeyedStateHandle call() throws Exception 
{
-                                       return 
snapshotOperation.materializeSnapshot();
-                               }
-                       }
-               ) {
-                       @Override
-                       public boolean cancel(boolean mayInterruptIfRunning) {
-                               snapshotOperation.stop();
-                               return super.cancel(mayInterruptIfRunning);
-                       }
-
-                       @Override
-                       protected void done() {
-                               
snapshotOperation.releaseResources(isCancelled());
-                       }
-               };
-       }
-
-       private RunnableFuture<KeyedStateHandle> snapshotFully(
-               final long checkpointId,
-               final long timestamp,
-               final CheckpointStreamFactory streamFactory) throws Exception {
-
-               long startTime = System.currentTimeMillis();
-               final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
-
-               final RocksDBFullSnapshotOperation<K> snapshotOperation;
-
-               if (kvStateInformation.isEmpty()) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.", timestamp);
-                       }
-
-                       return DoneFuture.nullValue();
-               }
-
-               snapshotOperation = new RocksDBFullSnapshotOperation<>(this, 
streamFactory, snapshotCloseableRegistry);
-               snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
-
-               // implementation of the async IO operation, based on FutureTask
-               AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable 
=
-                       new 
AbstractAsyncCallableWithResources<KeyedStateHandle>() {
-
-                               @Override
-                               protected void acquireResources() throws 
Exception {
-                                       
cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-                                       
snapshotOperation.openCheckpointStream();
-                               }
-
-                               @Override
-                               protected void releaseResources() throws 
Exception {
-                                       closeLocalRegistry();
-                                       releaseSnapshotOperationResources();
-                               }
-
-                               private void 
releaseSnapshotOperationResources() {
-                                       // hold the db lock while operation on 
the db to guard us against async db disposal
-                                       
snapshotOperation.releaseSnapshotResources();
-                               }
-
-                               @Override
-                               protected void stopOperation() throws Exception 
{
-                                       closeLocalRegistry();
-                               }
-
-                               private void closeLocalRegistry() {
-                                       if 
(cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
-                                               try {
-                                                       
snapshotCloseableRegistry.close();
-                                               } catch (Exception ex) {
-                                                       LOG.warn("Error closing 
local registry", ex);
-                                               }
-                                       }
-                               }
-
-                               @Override
-                               public KeyGroupsStateHandle performOperation() 
throws Exception {
-                                       long startTime = 
System.currentTimeMillis();
-
-                                       if (isStopped()) {
-                                               throw new IOException("RocksDB 
closed.");
-                                       }
-
-                                       snapshotOperation.writeDBSnapshot();
-
-                                       LOG.info("Asynchronous RocksDB snapshot 
({}, asynchronous part) in thread {} took {} ms.",
-                                               streamFactory, 
Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
-                                       return 
snapshotOperation.getSnapshotResultStateHandle();
-                               }
-                       };
-
-               LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) 
in thread {} took {} ms.",
-                               streamFactory, Thread.currentThread(), 
(System.currentTimeMillis() - startTime));
-
-               return AsyncStoppableTaskWithCallback.from(ioCallable);
-       }
-
-       /**
-        * Encapsulates the process to perform a snapshot of a 
RocksDBKeyedStateBackend.
-        */
-       static final class RocksDBFullSnapshotOperation<K> {
-
-               static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
-               static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
-               private final RocksDBKeyedStateBackend<K> stateBackend;
-               private final KeyGroupRangeOffsets keyGroupRangeOffsets;
-               private final CheckpointStreamFactory checkpointStreamFactory;
-               private final CloseableRegistry snapshotCloseableRegistry;
-               private final ResourceGuard.Lease dbLease;
-
-               private long checkpointId;
-               private long checkpointTimeStamp;
-
-               private Snapshot snapshot;
-               private ReadOptions readOptions;
-               private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
-
-               private CheckpointStreamFactory.CheckpointStateOutputStream 
outStream;
-               private DataOutputView outputView;
-
-               RocksDBFullSnapshotOperation(
-                       RocksDBKeyedStateBackend<K> stateBackend,
-                       CheckpointStreamFactory checkpointStreamFactory,
-                       CloseableRegistry registry) throws IOException {
-
-                       this.stateBackend = stateBackend;
-                       this.checkpointStreamFactory = checkpointStreamFactory;
-                       this.keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(stateBackend.keyGroupRange);
-                       this.snapshotCloseableRegistry = registry;
-                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
-               }
-
-               /**
-                * 1) Create a snapshot object from RocksDB.
-                *
-                * @param checkpointId id of the checkpoint for which we take 
the snapshot
-                * @param checkpointTimeStamp timestamp of the checkpoint for 
which we take the snapshot
-                */
-               public void takeDBSnapShot(long checkpointId, long 
checkpointTimeStamp) {
-                       Preconditions.checkArgument(snapshot == null, "Only one 
ongoing snapshot allowed!");
-                       this.kvStateIterators = new 
ArrayList<>(stateBackend.kvStateInformation.size());
-                       this.checkpointId = checkpointId;
-                       this.checkpointTimeStamp = checkpointTimeStamp;
-                       this.snapshot = stateBackend.db.getSnapshot();
-               }
-
-               /**
-                * 2) Open CheckpointStateOutputStream through the 
checkpointStreamFactory into which we will write.
-                *
-                * @throws Exception
-                */
-               public void openCheckpointStream() throws Exception {
-                       Preconditions.checkArgument(outStream == null, "Output 
stream for snapshot is already set.");
-                       outStream = 
checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-                       snapshotCloseableRegistry.registerCloseable(outStream);
-                       outputView = new DataOutputViewStreamWrapper(outStream);
-               }
-
-               /**
-                * 3) Write the actual data from RocksDB from the time we took 
the snapshot object in (1).
-                *
-                * @throws IOException
-                */
-               public void writeDBSnapshot() throws IOException, 
InterruptedException {
-
-                       if (null == snapshot) {
-                               throw new IOException("No snapshot available. 
Might be released due to cancellation.");
-                       }
-
-                       Preconditions.checkNotNull(outStream, "No output stream 
to write snapshot.");
-                       writeKVStateMetaData();
-                       writeKVStateData();
-               }
-
-               /**
-                * 4) Returns a state handle to the snapshot after the snapshot 
procedure is completed and null before.
-                *
-                * @return state handle to the completed snapshot
-                */
-               public KeyGroupsStateHandle getSnapshotResultStateHandle() 
throws IOException {
-
-                       if 
(snapshotCloseableRegistry.unregisterCloseable(outStream)) {
-
-                               StreamStateHandle stateHandle = 
outStream.closeAndGetHandle();
-                               outStream = null;
-
-                               if (stateHandle != null) {
-                                       return new 
KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
-                               }
-                       }
-                       return null;
-               }
-
-               /**
-                * 5) Release the snapshot object for RocksDB and clean up.
-                */
-               public void releaseSnapshotResources() {
-
-                       outStream = null;
-
-                       if (null != kvStateIterators) {
-                               for (Tuple2<RocksIterator, Integer> 
kvStateIterator : kvStateIterators) {
-                                       
IOUtils.closeQuietly(kvStateIterator.f0);
-                               }
-                               kvStateIterators = null;
-                       }
-
-                       if (null != snapshot) {
-                               if (null != stateBackend.db) {
-                                       
stateBackend.db.releaseSnapshot(snapshot);
-                               }
-                               IOUtils.closeQuietly(snapshot);
-                               snapshot = null;
-                       }
-
-                       if (null != readOptions) {
-                               IOUtils.closeQuietly(readOptions);
-                               readOptions = null;
-                       }
-
-                       this.dbLease.close();
-               }
-
-               private void writeKVStateMetaData() throws IOException {
-
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> metaInfoSnapshots =
-                               new 
ArrayList<>(stateBackend.kvStateInformation.size());
-
-                       int kvStateId = 0;
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
-                               stateBackend.kvStateInformation.entrySet()) {
-
-                               
metaInfoSnapshots.add(column.getValue().f1.snapshot());
-
-                               //retrieve iterator for this k/v states
-                               readOptions = new ReadOptions();
-                               readOptions.setSnapshot(snapshot);
-
-                               kvStateIterators.add(
-                                       new 
Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), 
kvStateId));
-
-                               ++kvStateId;
-                       }
-
-                       KeyedBackendSerializationProxy<K> serializationProxy =
-                               new KeyedBackendSerializationProxy<>(
-                                       stateBackend.getKeySerializer(),
-                                       metaInfoSnapshots,
-                                       
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
stateBackend.keyGroupCompressionDecorator));
-
-                       serializationProxy.write(outputView);
-               }
-
-               private void writeKVStateData() throws IOException, 
InterruptedException {
-
-                       byte[] previousKey = null;
-                       byte[] previousValue = null;
-                       OutputStream kgOutStream = null;
-                       DataOutputView kgOutView = null;
-
-                       try {
-                               // Here we transfer ownership of RocksIterators 
to the RocksDBMergeIterator
-                               try (RocksDBMergeIterator mergeIterator = new 
RocksDBMergeIterator(
-                                       kvStateIterators, 
stateBackend.keyGroupPrefixBytes)) {
-
-                                       // handover complete, null out to 
prevent double close
-                                       kvStateIterators = null;
-
-                                       //preamble: setup with first key-group 
as our lookahead
-                                       if (mergeIterator.isValid()) {
-                                               //begin first key-group by 
recording the offset
-                                               
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
-                                               //write the k/v-state id as 
metadata
-                                               kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
-                                               kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
-                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               previousKey = 
mergeIterator.key();
-                                               previousValue = 
mergeIterator.value();
-                                               mergeIterator.next();
-                                       }
-
-                                       //main loop: write k/v pairs ordered by 
(key-group, kv-state), thereby tracking key-group offsets.
-                                       while (mergeIterator.isValid()) {
-
-                                               assert 
(!hasMetaDataFollowsFlag(previousKey));
-
-                                               //set signal in first key byte 
that meta data will follow in the stream after this k/v pair
-                                               if 
(mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
-
-                                                       //be cooperative and 
check for interruption from time to time in the hot loop
-                                                       checkInterrupted();
-
-                                                       
setMetaDataFollowsFlagInKey(previousKey);
-                                               }
-
-                                               writeKeyValuePair(previousKey, 
previousValue, kgOutView);
-
-                                               //write meta data if we have to
-                                               if 
(mergeIterator.isNewKeyGroup()) {
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-                                                       // this will just close 
the outer stream
-                                                       kgOutStream.close();
-                                                       //begin new key-group
-                                                       
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), 
outStream.getPos());
-                                                       //write the kev-state
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       kgOutStream = 
stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
-                                                       kgOutView = new 
DataOutputViewStreamWrapper(kgOutStream);
-                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               } else if 
(mergeIterator.isNewKeyValueState()) {
-                                                       //write the k/v-state
-                                                       //TODO this could be 
aware of keyGroupPrefixBytes and write only one byte if possible
-                                                       
kgOutView.writeShort(mergeIterator.kvStateId());
-                                               }
-
-                                               //request next k/v pair
-                                               previousKey = 
mergeIterator.key();
-                                               previousValue = 
mergeIterator.value();
-                                               mergeIterator.next();
-                                       }
-                               }
-
-                               //epilogue: write last key-group
-                               if (previousKey != null) {
-                                       assert 
(!hasMetaDataFollowsFlag(previousKey));
-                                       
setMetaDataFollowsFlagInKey(previousKey);
-                                       writeKeyValuePair(previousKey, 
previousValue, kgOutView);
-                                       //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                       
kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-                                       // this will just close the outer stream
-                                       kgOutStream.close();
-                                       kgOutStream = null;
-                               }
-
-                       } finally {
-                               // this will just close the outer stream
-                               IOUtils.closeQuietly(kgOutStream);
-                       }
-               }
-
-               private void writeKeyValuePair(byte[] key, byte[] value, 
DataOutputView out) throws IOException {
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(key, 
out);
-                       BytePrimitiveArraySerializer.INSTANCE.serialize(value, 
out);
-               }
-
-               static void setMetaDataFollowsFlagInKey(byte[] key) {
-                       key[0] |= FIRST_BIT_IN_BYTE_MASK;
-               }
-
-               static void clearMetaDataFollowsFlag(byte[] key) {
-                       key[0] &= 
(~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-               }
-
-               static boolean hasMetaDataFollowsFlag(byte[] key) {
-                       return 0 != (key[0] & 
RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-               }
-
-               private static void checkInterrupted() throws 
InterruptedException {
-                       if (Thread.currentThread().isInterrupted()) {
-                               throw new InterruptedException("RocksDB 
snapshot interrupted.");
-                       }
-               }
-       }
-
-       private static final class RocksDBIncrementalSnapshotOperation<K> {
-
-               /** The backend which we snapshot. */
-               private final RocksDBKeyedStateBackend<K> stateBackend;
-
-               /** Stream factory that creates the outpus streams to DFS. */
-               private final CheckpointStreamFactory checkpointStreamFactory;
-
-               /** Id for the current checkpoint. */
-               private final long checkpointId;
-
-               /** Timestamp for the current checkpoint. */
-               private final long checkpointTimestamp;
-
-               /** All sst files that were part of the last previously 
completed checkpoint. */
-               private Set<StateHandleID> baseSstFiles;
-
-               /** The state meta data. */
-               private final 
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
= new ArrayList<>();
-
-               private FileSystem backupFileSystem;
-               private Path backupPath;
-
-               // Registry for all opened i/o streams
-               private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
-
-               // new sst files since the last completed checkpoint
-               private final Map<StateHandleID, StreamStateHandle> sstFiles = 
new HashMap<>();
-
-               // handles to the misc files in the current snapshot
-               private final Map<StateHandleID, StreamStateHandle> miscFiles = 
new HashMap<>();
-
-               // This lease protects from concurrent disposal of the native 
rocksdb instance.
-               private final ResourceGuard.Lease dbLease;
-
-               private StreamStateHandle metaStateHandle = null;
-
-               private RocksDBIncrementalSnapshotOperation(
-                       RocksDBKeyedStateBackend<K> stateBackend,
-                       CheckpointStreamFactory checkpointStreamFactory,
-                       long checkpointId,
-                       long checkpointTimestamp) throws IOException {
-
-                       this.stateBackend = stateBackend;
-                       this.checkpointStreamFactory = checkpointStreamFactory;
-                       this.checkpointId = checkpointId;
-                       this.checkpointTimestamp = checkpointTimestamp;
-                       this.dbLease = 
this.stateBackend.rocksDBResourceGuard.acquireResource();
-               }
-
-               private StreamStateHandle materializeStateData(Path filePath) 
throws Exception {
-                       FSDataInputStream inputStream = null;
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
-
-                       try {
-                               final byte[] buffer = new byte[8 * 1024];
-
-                               FileSystem backupFileSystem = 
backupPath.getFileSystem();
-                               inputStream = backupFileSystem.open(filePath);
-                               
closeableRegistry.registerCloseable(inputStream);
-
-                               outputStream = checkpointStreamFactory
-                                       
.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-                               
closeableRegistry.registerCloseable(outputStream);
-
-                               while (true) {
-                                       int numBytes = inputStream.read(buffer);
-
-                                       if (numBytes == -1) {
-                                               break;
-                                       }
-
-                                       outputStream.write(buffer, 0, numBytes);
-                               }
-
-                               StreamStateHandle result = null;
-                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                       result = 
outputStream.closeAndGetHandle();
-                                       outputStream = null;
-                               }
-                               return result;
-
-                       } finally {
-                               if (inputStream != null && 
closeableRegistry.unregisterCloseable(inputStream)) {
-                                       inputStream.close();
-                               }
-
-                               if (outputStream != null && 
closeableRegistry.unregisterCloseable(outputStream)) {
-                                       outputStream.close();
-                               }
-                       }
-               }
-
-               private StreamStateHandle materializeMetaData() throws 
Exception {
-                       CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = null;
-
-                       try {
-                               outputStream = checkpointStreamFactory
-                                       
.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-                               
closeableRegistry.registerCloseable(outputStream);
-
-                               //no need for compression scheme support 
because sst-files are already compressed
-                               KeyedBackendSerializationProxy<K> 
serializationProxy =
-                                       new KeyedBackendSerializationProxy<>(
-                                               stateBackend.keySerializer,
-                                               stateMetaInfoSnapshots,
-                                               false);
-
-                               DataOutputView out = new 
DataOutputViewStreamWrapper(outputStream);
-
-                               serializationProxy.write(out);
-
-                               StreamStateHandle result = null;
-                               if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                       result = 
outputStream.closeAndGetHandle();
-                                       outputStream = null;
-                               }
-                               return result;
-                       } finally {
-                               if (outputStream != null) {
-                                       if 
(closeableRegistry.unregisterCloseable(outputStream)) {
-                                               outputStream.close();
-                                       }
-                               }
-                       }
-               }
-
-               void takeSnapshot() throws Exception {
-
-                       final long lastCompletedCheckpoint;
-
-                       // use the last completed checkpoint as the comparison 
base.
-                       synchronized (stateBackend.materializedSstFiles) {
-                               lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
-                               baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
-                       }
-
-                       LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
-                               "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
-
-                       // save meta data
-                       for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
-                               : stateBackend.kvStateInformation.entrySet()) {
-                               
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
-                       }
-
-                       // save state data
-                       backupPath = new 
Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
-
-                       LOG.trace("Local RocksDB checkpoint goes to backup path 
{}.", backupPath);
-
-                       backupFileSystem = backupPath.getFileSystem();
-                       if (backupFileSystem.exists(backupPath)) {
-                               throw new IllegalStateException("Unexpected 
existence of the backup directory.");
-                       }
-
-                       // create hard links of living files in the checkpoint 
path
-                       Checkpoint checkpoint = 
Checkpoint.create(stateBackend.db);
-                       checkpoint.createCheckpoint(backupPath.getPath());
-               }
-
-               KeyedStateHandle materializeSnapshot() throws Exception {
-
-                       
stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
-
-                       // write meta data
-                       metaStateHandle = materializeMetaData();
-
-                       // write state data
-                       
Preconditions.checkState(backupFileSystem.exists(backupPath));
-
-                       FileStatus[] fileStatuses = 
backupFileSystem.listStatus(backupPath);
-                       if (fileStatuses != null) {
-                               for (FileStatus fileStatus : fileStatuses) {
-                                       final Path filePath = 
fileStatus.getPath();
-                                       final String fileName = 
filePath.getName();
-                                       final StateHandleID stateHandleID = new 
StateHandleID(fileName);
-
-                                       if (fileName.endsWith(SST_FILE_SUFFIX)) 
{
-                                               final boolean existsAlready =
-                                                       baseSstFiles != null && 
baseSstFiles.contains(stateHandleID);
-
-                                               if (existsAlready) {
-                                                       // we introduce a 
placeholder state handle, that is replaced with the
-                                                       // original from the 
shared state registry (created from a previous checkpoint)
-                                                       sstFiles.put(
-                                                               stateHandleID,
-                                                               new 
PlaceholderStreamStateHandle());
-                                               } else {
-                                                       
sstFiles.put(stateHandleID, materializeStateData(filePath));
-                                               }
-                                       } else {
-                                               StreamStateHandle fileHandle = 
materializeStateData(filePath);
-                                               miscFiles.put(stateHandleID, 
fileHandle);
-                                       }
-                               }
-                       }
-
-                       synchronized (stateBackend.materializedSstFiles) {
-                               
stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
-                       }
-
-                       return new IncrementalKeyedStateHandle(
-                               stateBackend.backendUID,
-                               stateBackend.keyGroupRange,
-                               checkpointId,
-                               sstFiles,
-                               miscFiles,
-                               metaStateHandle);
-               }
-
-               void stop() {
-
-                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-                               try {
-                                       closeableRegistry.close();
-                               } catch (IOException e) {
-                                       LOG.warn("Could not properly close io 
streams.", e);
-                               }
-                       }
-               }
-
-               void releaseResources(boolean canceled) {
-
-                       dbLease.close();
-
-                       if 
(stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-                               try {
-                                       closeableRegistry.close();
-                               } catch (IOException e) {
-                                       LOG.warn("Exception on closing 
registry.", e);
-                               }
-                       }
-
-                       if (backupPath != null) {
-                               try {
-                                       if 
(backupFileSystem.exists(backupPath)) {
-
-                                               LOG.trace("Deleting local 
RocksDB backup path {}.", backupPath);
-                                               
backupFileSystem.delete(backupPath, true);
-                                       }
-                               } catch (Exception e) {
-                                       LOG.warn("Could not properly delete the 
checkpoint directory.", e);
-                               }
-                       }
-
-                       if (canceled) {
-                               Collection<StateObject> statesToDiscard =
-                                       new ArrayList<>(1 + miscFiles.size() + 
sstFiles.size());
-
-                               statesToDiscard.add(metaStateHandle);
-                               statesToDiscard.addAll(miscFiles.values());
-                               statesToDiscard.addAll(sstFiles.values());
-
-                               try {
-                                       
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
-                               } catch (Exception e) {
-                                       LOG.warn("Could not properly discard 
states.", e);
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void restore(Collection<KeyedStateHandle> restoreState) throws 
Exception {
-               LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
-               }
-
-               // clear all meta data
-               kvStateInformation.clear();
-               restoredKvStateMetaInfos.clear();
-
-               try {
-                       if (restoreState == null || restoreState.isEmpty()) {
-                               createDB();
-                       } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
-                               RocksDBIncrementalRestoreOperation<K> 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-                               restoreOperation.restore(restoreState);
-                       } else {
-                               RocksDBFullRestoreOperation<K> restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
-                               restoreOperation.doRestore(restoreState);
-                       }
-               } catch (Exception ex) {
-                       dispose();
-                       throw ex;
-               }
-       }
-
-       @Override
-       public void notifyCheckpointComplete(long completedCheckpointId) {
-               synchronized (materializedSstFiles) {
-                       if (completedCheckpointId < lastCompletedCheckpointId) {
-                               return;
-                       }
-
-                       materializedSstFiles.keySet().removeIf(checkpointId -> 
checkpointId < completedCheckpointId);
-
-                       lastCompletedCheckpointId = completedCheckpointId;
-               }
-       }
-
-       private void createDB() throws IOException {
-               List<ColumnFamilyHandle> columnFamilyHandles = new 
ArrayList<>(1);
-               this.db = openDB(instanceRocksDBPath.getAbsolutePath(), 
Collections.emptyList(), columnFamilyHandles);
-               this.defaultColumnFamily = columnFamilyHandles.get(0);
-       }
-
-       private RocksDB openDB(
-               String path,
-               List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
-               List<ColumnFamilyHandle> stateColumnFamilyHandles) throws 
IOException {
-
-               List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-                       new ArrayList<>(1 + 
stateColumnFamilyDescriptors.size());
-
-               columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
-
-               // we add the required descriptor for the default CF in last 
position.
-               columnFamilyDescriptors.add(new 
ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
-
-               RocksDB dbRef;
-
-               try {
-                       dbRef = RocksDB.open(
-                               Preconditions.checkNotNull(dbOptions),
-                               Preconditions.checkNotNull(path),
-                               columnFamilyDescriptors,
-                               stateColumnFamilyHandles);
-               } catch (RocksDBException e) {
-                       throw new IOException("Error while opening RocksDB 
instance.", e);
-               }
-
-               // requested + default CF
-               Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
-                       "Not all requested column family handles have been 
created");
-
-               return dbRef;
-       }
-
-       /**
-        * Encapsulates the process of restoring a RocksDBKeyedStateBackend 
from a snapshot.
-        */
-       static final class RocksDBFullRestoreOperation<K> {
-
-               private final RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend;
-
-               /** Current key-groups state handle from which we restore 
key-groups. */
-               private KeyGroupsStateHandle currentKeyGroupsStateHandle;
-               /** Current input stream we obtained from 
currentKeyGroupsStateHandle. */
-               private FSDataInputStream currentStateHandleInStream;
-               /** Current data input view that wraps 
currentStateHandleInStream. */
-               private DataInputView currentStateHandleInView;
-               /** Current list of ColumnFamilyHandles for all column families 
we restore from currentKeyGroupsStateHandle. */
-               private List<ColumnFamilyHandle> 
currentStateHandleKVStateColumnFamilies;
-               /** The compression decorator that was used for writing the 
state, as determined by the meta data. */
-               private StreamCompressionDecorator 
keygroupStreamCompressionDecorator;
-
-               /**
-                * Creates a restore operation object for the given state 
backend instance.
-                *
-                * @param rocksDBKeyedStateBackend the state backend into which 
we restore
-                */
-               public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> 
rocksDBKeyedStateBackend) {
-                       this.rocksDBKeyedStateBackend = 
Preconditions.checkNotNull(rocksDBKeyedStateBackend);
-               }
-
-               /**
-                * Restores all key-groups data that is referenced by the 
passed state handles.
-                *
-                * @param keyedStateHandles List of all key groups state 
handles that shall be restored.
-                */
-               public void doRestore(Collection<KeyedStateHandle> 
keyedStateHandles)
-                       throws IOException, StateMigrationException, 
RocksDBException {
-
-                       rocksDBKeyedStateBackend.createDB();
-
-                       for (KeyedStateHandle keyedStateHandle : 
keyedStateHandles) {
-                               if (keyedStateHandle != null) {
-
-                                       if (!(keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
-                                               throw new 
IllegalStateException("Unexpected state handle type, " +
-                                                       "expected: " + 
KeyGroupsStateHandle.class +
-                                                       ", but found: " + 
keyedStateHandle.getClass());
-                                       }
-                                       this.currentKeyGroupsStateHandle = 
(KeyGroupsStateHandle) keyedStateHandle;
-                                       restoreKeyGroupsInStateHandle();
-                               }
-                       }
-               }
-
-               /**
-                * Restore one key groups state handle.
-                */
-               private void restoreKeyGroupsInStateHandle()
-                       throws IOException, StateMigrationException, 
RocksDBException {
-                       try {
-                               currentStateHandleInStream = 
currentKeyGroupsStateHandle.openInputStream();
-                               
rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
-                               currentStateHandleInView = new 
DataInputViewStreamWrapper(currentStateHandleInStream);
-                               restoreKVStateMetaData();
-                               restoreKVStateData();
-                       } finally {
-                               if (currentStateHandleInStream != null
-                                       && 
rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
 {
-                                       
IOUtils.closeQuietly(currentStateHandleInStream);
-                               }
-                       }
-               }
-
-               /**
-                * Restore the KV-state / ColumnFamily meta data for all 
key-groups referenced by the current state handle.
-                *
-                * @throws IOException
-                * @throws ClassNotFoundException
-                * @throws RocksDBException
-                */
-               private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
-
-                       KeyedBackendSerializationProxy<K> serializationProxy =
-                               new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
-
-                       serializationProxy.read(currentStateHandleInView);
-
-                       // check for key serializer compatibility; this also 
reconfigures the
-                       // key serializer to be compatible, if it is required 
and is possible
-                       if (CompatibilityUtil.resolveCompatibilityResult(
-                               serializationProxy.getKeySerializer(),
-                               UnloadableDummyTypeSerializer.class,
-                               
serializationProxy.getKeySerializerConfigSnapshot(),
-                               rocksDBKeyedStateBackend.keySerializer)
-                               .isRequiresMigration()) {
-
-                               // TODO replace with state migration; note that 
key hash codes need to remain the same after migration
-                               throw new StateMigrationException("The new key 
serializer is not compatible to read previous keys. " +
-                                       "Aborting now since state migration is 
currently not available");
-                       }
-
-                       this.keygroupStreamCompressionDecorator = 
serializationProxy.isUsingKeyGroupCompression() ?
-                               SnappyStreamCompressionDecorator.INSTANCE : 
UncompressedStreamCompressionDecorator.INSTANCE;
-
-                       List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> restoredMetaInfos =
-                               serializationProxy.getStateMetaInfoSnapshots();
-                       currentStateHandleKVStateColumnFamilies = new 
ArrayList<>(restoredMetaInfos.size());
-                       //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = 
new HashMap<>(restoredMetaInfos.size());
-
-                       for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> 
restoredMetaInfo : restoredMetaInfos) {
-
-                               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
-                                       
rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
-
-                               if (registeredColumn == null) {
-                                       byte[] nameBytes = 
restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-
-                                       ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
-                                               nameBytes,
-                                               
rocksDBKeyedStateBackend.columnOptions);
-
-                                       RegisteredKeyedBackendStateMetaInfo<?, 
?> stateMetaInfo =
-                                               new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                                       
restoredMetaInfo.getStateType(),
-                                                       
restoredMetaInfo.getName(),
-                                                       
restoredMetaInfo.getNamespaceSerializer(),
-                                                       
restoredMetaInfo.getStateSerializer());
-
-                                       
rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
 restoredMetaInfo);
-
-                                       ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
-
-                                       registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
-                                       
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
-
-                               } else {
-                                       // TODO with eager state registration 
in place, check here for serializer migration strategies
-                               }
-                               
currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
-                       }
-               }
-
-               /**
-                * Restore the KV-state / ColumnFamily data for all key-groups 
referenced by the current state handle.
-                *
-                * @throws IOException
-                * @throws RocksDBException
-                */
-               private void restoreKVStateData() throws IOException, 
RocksDBException {
-                       //for all key-groups in the current state handle...
-                       for (Tuple2<Integer, Long> keyGroupOffset : 
currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-                               int keyGroup = keyGroupOffset.f0;
-
-                               // Check that restored key groups all belong to 
the backend
-                               
Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
-                                       "The key group must belong to the 
backend");
-
-                               long offset = keyGroupOffset.f1;
-                               //not empty key-group?
-                               if (0L != offset) {
-                                       currentStateHandleInStream.seek(offset);
-                                       try (InputStream compressedKgIn = 
keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
 {
-                                               DataInputViewStreamWrapper 
compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
-                                               //TODO this could be aware of 
keyGroupPrefixBytes and write only one byte if possible
-                                               int kvStateId = 
compressedKgInputView.readShort();
-                                               ColumnFamilyHandle handle = 
currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                               //insert all k/v pairs into DB
-                                               boolean keyGroupHasMoreKeys = 
true;
-                                               while (keyGroupHasMoreKeys) {
-                                                       byte[] key = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-                                                       byte[] value = 
BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-                                                       if 
(RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
-                                                               //clear the 
signal bit in the key to make it ready for insertion again
-                                                               
RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
-                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
-                                                               //TODO this 
could be aware of keyGroupPrefixBytes and write only one byte if possible
-                                                               kvStateId = 
RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
-                                                                       & 
compressedKgInputView.readShort();
-                                                               if 
(RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
-                                                                       
keyGroupHasMoreKeys = false;
-                                                               } else {
-                                                                       handle 
= currentStateHandleKVStateColumnFamilies.get(kvStateId);
-                                                               }
-                                                       } else {
-                                                               
rocksDBKeyedStateBackend.db.put(handle, key, value);
-                                                       }
-                                               }
-                                       }
-                               }
-                       }
-               }
-       }
-
-       private static class RocksDBIncrementalRestoreOperation<T> {
-
-               private final RocksDBKeyedStateBackend<T> stateBackend;
-
-               private 
RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
-                       this.stateBackend = stateBackend;
-               }
-
-               private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, 
?>> readMetaData(
-                       StreamStateHandle metaStateHandle) throws Exception {
-
-                       FSDataInputStream inputStream = null;
-
-                       try {
-                               inputStream = metaStateHandle.openInputStream();
-                               
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
-
-                               KeyedBackendSerializationProxy<T> 
serializationProxy =
-                                       new 
KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
-                               DataInputView in = new 
DataInputViewStreamWrapper(inputStream);
-                               serializationProxy.read(in);
-
-                               // check for key serializer compatibility; this 
also reconfigures the
-                               // key serializer to be compatible, if it is 
required and is possible
-                               if 
(CompatibilityUtil.resolveCompatibilityResult(
-                                       serializationProxy.getKeySerializer(),
-                                       UnloadableDummyTypeSerializer.class,
-                                       
serializationProxy.getKeySerializerConfigSnapshot(),
-                                       stateBackend.keySerializer)
-                                       .isRequiresMigration()) {
-
-                                       // TODO replace with state migration; 
note that key hash codes need to remain the same after migration
-                                       throw new StateMigrationException("The 
new key serializer is not compatible to read previous keys. " +
-                                               "Aborting now since state 
migration is currently not available");
-                               }
-
-                               return 
serializationProxy.getStateMetaInfoSnapshots();
-                       } finally {
-                               if (inputStream != null && 
stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
-                                       inputStream.close();
-                               }
-                       }
-               }
-
-               private void readStateData(
-                       Path restoreFilePath,
-                       StreamStateHandle remoteFileHandle) throws IOException {
-
-                       FileSystem restoreFileSystem = 
restoreFilePath.getFileSystem();
-
-                       FSDataInputStream inputStream = null;
-                       FSDataOutputStream outputStream = null;
-
-                       try {
-                               inputStream = 
remoteFileHandle.openInputStream();
-                               
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
-
-                               outputStream = 
restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
-                               
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
-
-                               byte[] buffer = new byte[8 * 1024];
-                               while (true) {
-                                       int numBytes = inputStream.read(buffer);
-                                       if (numBytes == -1) {
-                                               break;
-                                       }
-
-                                       outputStream.write(buffer, 0, numBytes);
-                               }
-                       } finally {
-                               if (inputStream != null && 
stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
-                                       inputStream.close();
-                               }
-
-                               if (outputStream != null && 
stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
-                                       outputStream.close();
-                               }
-                       }
-               }
-
-               private void restoreInstance(
-                       IncrementalKeyedStateHandle restoreStateHandle,
-                       boolean hasExtraKeys) throws Exception {
-
-                       // read state data
-                       Path restoreInstancePath = new Path(
-                               stateBackend.instanceBasePath.getAbsolutePath(),
-                               UUID.randomUUID().toString());
-
-                       try {
-                               final Map<StateHandleID, StreamStateHandle> 
sstFiles =
-                                       restoreStateHandle.getSharedState();
-                               final Map<StateHandleID, StreamStateHandle> 
miscFiles =
-                                       restoreStateHandle.getPrivateState();
-
-                               readAllStateData(sstFiles, restoreInstancePath);
-                               readAllStateData(miscFiles, 
restoreInstancePath);
-
-                               // read meta data
-                               
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots 
=
-                                       
readMetaData(restoreStateHandle.getMetaStateHandle());
-
-                               List<ColumnFamilyDescriptor> 
columnFamilyDescriptors =
-                                       new ArrayList<>(1 + 
stateMetaInfoSnapshots.size());
-
-                               for 
(RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : 
stateMetaInfoSnapshots) {
-
-                                       ColumnFamilyDescriptor 
columnFamilyDescriptor = new ColumnFamilyDescriptor(
-                                               
stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
-                                               stateBackend.columnOptions);
-
-                                       
columnFamilyDescriptors.add(columnFamilyDescriptor);
-                                       
stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), 
stateMetaInfoSnapshot);
-                               }
-
-                               if (hasExtraKeys) {
-
-                                       List<ColumnFamilyHandle> 
columnFamilyHandles =
-                                               new ArrayList<>(1 + 
columnFamilyDescriptors.size());
-
-                                       try (RocksDB restoreDb = 
stateBackend.openDB(
-                                               restoreInstancePath.getPath(),
-                                               columnFamilyDescriptors,
-                                               columnFamilyHandles)) {
-
-                                               try {
-                                                       // iterating only the 
requested descriptors automatically skips the default column family handle
-                                                       for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
-                                                               
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-                                                               
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
-                                                               
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
-
-                                                               
Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> 
registeredStateMetaInfoEntry =
-                                                                       
stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
-
-                                                               if (null == 
registeredStateMetaInfoEntry) {
-
-                                                                       
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-                                                                               
new RegisteredKeyedBackendStateMetaInfo<>(
-                                                                               
        stateMetaInfoSnapshot.getStateType(),
-                                                                               
        stateMetaInfoSnapshot.getName(),
-                                                                               
        stateMetaInfoSnapshot.getNamespaceSerializer(),
-                                                                               
        stateMetaInfoSnapshot.getStateSerializer());
-
-                                                                       
registeredStateMetaInfoEntry =
-                                                                               
new Tuple2<>(
-                                                                               
        stateBackend.db.createColumnFamily(columnFamilyDescriptor),
-                                                                               
        stateMetaInfo);
-
-                                                                       
stateBackend.kvStateInformation.put(
-                                                                               
stateMetaInfoSnapshot.getName(),
-                                                                               
registeredStateMetaInfoEntry);
-                                                               }
-
-                                                               
ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
-
-                                                               try 
(RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
-
-                                                                       int 
startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
-                                                                       byte[] 
startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
-                                                                       for 
(int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-                                                                               
startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> 
((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
-                                                                       }
-
-                                                                       
iterator.seek(startKeyGroupPrefixBytes);
-
-                                                                       while 
(iterator.isValid()) {
-
-                                                                               
int keyGroup = 0;
-                                                                               
for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-                                                                               
        keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
-                                                                               
}
-
-                                                                               
if (stateBackend.keyGroupRange.contains(keyGroup)) {
-                                                                               
        stateBackend.db.put(targetColumnFamilyHandle,
-                                                                               
                iterator.key(), iterator.value());
-                                                                               
}
-
-                                                                               
iterator.next();
-                                                                       }
-                                                               } // releases 
native iterator resources
-                                                       }
-                                               } finally {
-                                                       //release native tmp db 
column family resources
-                                                       for (ColumnFamilyHandle 
columnFamilyHandle : columnFamilyHandles) {
-                                                               
IOUtils.closeQuietly(columnFamilyHandle);
-                                                       }
-                                               }
-                                       } // releases native tmp db resources
-                               } else {
-                                       // pick up again the old backend id, so 
the we can reference existing state
-                                       stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
-
-                                       LOG.debug("Restoring keyed backend uid 
in operator {} from incremental snapshot to {}.",
-                                               
stateBackend.operatorIdentifier, stateBackend.backendUID);
-
-                                       // create hard links in the instance 
directory
-                                       if 
(!stateBackend.instanceRocksDBPath.mkdirs()) {
-                                               throw new IOException("Could 
not create RocksDB data directory.");
-                                       }
-
-                                       
createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
-                                       
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
-
-                                       List<ColumnFamilyHandle> 
columnFamilyHandles =
-                                               new ArrayList<>(1 + 
columnFamilyDescriptors.size());
-
-                                       stateBackend.db = stateBackend.openDB(
-                                               
stateBackend.instanceRocksDBPath.getAbsolutePath(),
-                                               columnFamilyDescriptors, 
columnFamilyHandles);
-
-                                       // extract and store the default column 
family which is located at the last index
-                                       stateBackend.defaultColumnFamily = 
columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
-
-                                       for (int i = 0; i < 
columnFamilyDescriptors.size(); ++i) {
-                                               
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = 
stateMetaInfoSnapshots.get(i);
-
-                                               ColumnFamilyHandle 
columnFamilyHandle = columnFamilyHandles.get(i);
-                                               
RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-                                                       new 
RegisteredKeyedBackendStateMetaInfo<>(
-                                                               
stateMetaInfoSnapshot.getStateType(),
-                                                               
stateMetaInfoSnapshot.getName(),
-                                                               
stateMetaInfoSnapshot.getNamespaceSerializer(),
-                                                               
stateMetaInfoSnapshot.getStateSerializer());
-
-                                               
stateBackend.kvStateInformation.put(
-                                                       
stateMetaInfoSnapshot.getName(),
-                                                       new 
Tuple2<>(columnFamilyHandle, stateMetaInfo));
-                                       }
-
-                                       // use the restore sst files as the 
base for succeeding checkpoints
-                                       synchronized 
(stateBackend.materializedSstFiles) {
-                                               
stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), 
sstFiles.keySet());
-                                       }
-
-                                       stateBackend.lastCompletedCheckpointId 
= restoreStateHandle.getCheckpointId();
-                               }
-                       } finally {
-                               FileSystem restoreFileSystem = 
restoreInstancePath.getFileSystem();
-                               if 
(restoreFileSystem.exists(restoreInstancePath)) {
-                                       
restoreFileSystem.delete(restoreInstancePath, true);
-                               }
-                       }
-               }
-
-               private void readAllStateData(
-                       Map<StateHandleID, StreamStateHandle> stateHandleMap,
-                       Path restoreInstancePath) throws IOException {
-
-                       for (Map.Entry<StateHandleID, StreamStateHandle> entry 
: stateHandleMap.entrySet()) {
-                               StateHandleID stateHandleID = entry.getKey();
-                               StreamStateHandle remoteFileHandle = 
entry.getValue();
-                               readStateData(new Path(restoreInstancePath, 
stateHandleID.toString()), remoteFileHandle);
-                       }
-               }
-
-               private void createFileHardLinksInRestorePath(
-                       Map<StateHandleID, StreamStateHandle> stateHandleMap,
-                       Path restoreInstancePath) throws IOException {
-
-                       for (StateHandleID stateHandleID : 
stateHandleMap.keySet()) {
-                               String newSstFileName = 
stateHandleID.toString();
-                               File restoreFile = new 
File(restoreInstancePath.getPath(), newSstFileName);
-                               File targetFile = new 
File(stateBackend.instanceRocksDBPath, newSstFileName);
-                               Files.createLink(targetFile.toPath(), 
restoreFile.toPath());
-                       }
-               }
-
-               void restore(Collection<KeyedStateHandle> restoreStateHandles) 
throws Exception {
-
-                       boolean hasExtraKeys = (restoreStateHandles.size() > 1 
||
-                               
!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), 
stateBackend.keyGroupRange));
-
-                       if (hasExtraKeys) {
-                               stateBackend.createDB();
-                       }
-
-                       for (KeyedStateHandle rawStateHandle : 
restoreStateHandles) {
-
-                               if (!(rawStateHandle instanceof 
IncrementalKeyedStateHandle)) {
-                                       throw new 
IllegalStateException("Unexpected state handle type, " +
-                                               "expected " + 
IncrementalKeyedStateHandle.class +
-                                               ", but found " + 
rawStateHandle.getClass());
-                               }
-
-                               IncrementalKeyedStateHandle keyedStateHandle = 
(IncrementalKeyedStateHandle) rawStateHandle;
-
-                               restoreInstance(keyedStateHandle, hasExtraKeys);
-                       }
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  State factories
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a column family handle for use with a k/v state. When 
restoring from a snapshot
-        * we don't restore the individual k/v states, just the global RocksDB 
database and the
-        * list of column families. When a k/v state is first requested we 
check here whether we
-        * already have a column family for that and return it or create a new 
one if it doesn't exist.
-        *
-        * <p>This also checks whether the {@link StateDescriptor} for a state 
matches the one
-        * that we checkpointed, i.e. is already in the map of column families.
-        */
-       @SuppressWarnings("rawtypes, unchecked")
-       protected <N, S> ColumnFamilyHandle getColumnFamily(
-               StateDescriptor<?, S> descriptor, TypeSerializer<N> 
namespaceSerializer) throws IOException, StateMigrationException {
-
-               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
-                       kvStateInformation.get(descriptor.getName());
-
-               RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new 
RegisteredKeyedBackendStateMetaInfo<>(
-                       descriptor.getType(),
-                       descriptor.getName(),
-                       namespaceSerializer,
-                       descriptor.getSerializer());
-
-               if (stateInfo != null) {
-                       // TODO with eager registration in place, these checks 
should be moved to restore()
-
-                       RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> 
restoredMetaInfo =
-                               
(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) 
restoredKvStateMetaInfos.get(descriptor.getName());
-
-                       Preconditions.checkState(
-                               Objects.equals(newMetaInfo.getName(), 
restoredMetaInfo.getName()),
-                               "Incompatible state names. " +
-                                       "Was [" + restoredMetaInfo.getName() + 
"], " +
-                                       "registered with [" + 
newMetaInfo.getName() + "].");
-
-                       if (!Objects.equals(newMetaInfo.getStateType(), 
StateDescriptor.Type.UNKNOWN)
-                               && 
!Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) 
{
-
-                               Preconditions.checkState(
-                                       newMetaInfo.getStateType() == 
restoredMetaInfo.getStateType(),
-                                       "Incompatible state types. " +
-                                               "Was [" + 
restoredMetaInfo.getStateType() + "], " +
-                                               "registered with [" + 
newMetaInfo.getStateType() + "].");
-                       }
-
-                       // check compatibility results to determine if state 
migration is required
-                       CompatibilityResult<N> namespaceCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                               restoredMetaInfo.getNamespaceSerializer(),
-                               null,
-                               
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-                               newMetaInfo.getNamespaceSerializer());
-
-                       CompatibilityResult<S> stateCompatibility = 
CompatibilityUtil.resolveCompatibilityResult(
-                               restoredMetaInfo.getStateSerializer(),
-                               UnloadableDummyTypeSerializer.class,
-                               
restoredMetaInfo.getStateSerializerConfigSnapshot(),
-                               newMetaInfo.getStateSerializer());
-
-                       if (namespaceCompatibility.isRequiresMigration() || 
stateCompatibility.isRequiresMigration()) {
-                               // TODO state migration currently isn't 
possible.
-                               throw new StateMigrationException("State 
migration isn't supported, yet.");
-                       } else {
-                               stateInfo.f1 = newMetaInfo;
-                               return stateInfo.f0;
-                       }
-               }
-
-               byte[] nameBytes = 
descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-               
Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, 
nameBytes),
-                       "The chosen state name 'default' collides with the name 
of the default column family!");
-
-               ColumnFamilyDescriptor columnDescriptor = new 
ColumnFamilyDescriptor(nameBytes, columnOptions);
-
-               final ColumnFamilyHandle columnFamily;
-
-               try {
-                       columnFamily = db.createColumnFamily(columnDescriptor);
-               } catch (RocksDBException e) {
-                       throw new IOException("Error creating 
ColumnFamilyHandle.", e);
-               }
-
-               Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
-                       new Tuple2<>(columnFamily, newMetaInfo);
-               Map rawAccess = kvStateInformation;
-               rawAccess.put(descriptor.getName(), tuple);
-               return columnFamily;
-       }
-
-       @Override
-       protected <N, T> InternalValueState<N, T> createValueState(
-               TypeSerializer<N> namespaceSerializer,
-               ValueStateDescriptor<T> stateDesc) throws Exception {
-
-               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
-
-               return new RocksDBValueState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
-       }
-
-       @Override
-       protected <N, T> InternalListState<N, T> createListState(
-               TypeSerializer<N> namespaceSerializer,
-               ListStateDescriptor<T> stateDesc) throws Exception {
-
-               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
-
-               return new RocksDBListState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
-       }
-
-       @Override
-       protected <N, T> InternalReducingState<N, T> createReducingState(
-               TypeSerializer<N> namespaceSerializer,
-               ReducingStateDescriptor<T> stateDesc) throws Exception {
-
-               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
-
-               return new RocksDBReducingState<>(columnFamily, 
namespaceSerializer,  stateDesc, this);
-       }
-
-       @Override
-       protected <N, T, ACC, R> InternalAggregatingState<N, T, R> 
createAggregatingState(
-               TypeSerializer<N> namespaceSerializer,
-               AggregatingStateDescriptor<T, ACC, R> stateDesc) throws 
Exception {
-
-               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
-               return new RocksDBAggregatingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
-       }
-
-       @Override
-       protected <N, T, ACC> InternalFoldingState<N, T, ACC> 
createFoldingState(
-               TypeSerializer<N> namespaceSerializer,
-               FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
-               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
-
-               return new RocksDBFoldingState<>(columnFamily, 
namespaceSerializer, stateDesc, this);
-       }
-
-       @Override
-       protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
-               TypeSerializer<N> namespaceSerializer,
-               MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
-               ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, 
namespaceSerializer);
-
-               return new RocksDBMapState<>(columnFamily, namespaceSerializer, 
stateDesc, this);
-       }
-
-       /**
-        * Wraps a RocksDB iterator to cache it's current key and assigns an id 
for the key/value state to the iterator.
-        * Used by #MergeIterator.
-        */
-       static final class MergeIterator implements AutoCloseable {
-
-               /**
-                * @param iterator  The #RocksIterator to wrap .
-                * @param kvStateId Id of the K/V state to which this iterator 
belongs.
-                */
-               MergeIterator(RocksIterator iterator, int kvStateId) {
-                       this.iterator = Preconditions.checkNotNull(iterator);
-                       this.currentKey = iterator.key();
-                       this.kvStateId = kvStateId;
-               }
-
-               private final RocksIterator iterator;
-               private byte[] currentKey;
-               private final int kvStateId;
-
-               public byte[] getCurrentKey() {
-                       return currentKey;
-               }
-
-               public void setCurrentKey(byte[] currentKey) {
-                       this.currentKey = currentKey;
-               }
-
-               public RocksIterator getIterator() {
-                       return iterator;
-               }
-
-               public int getKvStateId() {
-                       return kvStateId;
-               }
-
-               @Override
-               public void close() {
-                       IOUtils.closeQuietly(iterator);
-               }
-       }
-
-       /**
-        * Iterator that merges multiple RocksDB iterators to partition all 
states into contiguous key-groups.
-        * The resulting iteration sequence is ordered by (key-group, kv-state).
-        */
-       static final class RocksDBMergeIterator implements AutoCloseable {
-
-               private final PriorityQueue<MergeIterator> heap;
-               private final int keyGroupPrefixByteCount;
-               private boolean newKeyGroup;
-               private boolean newKVState;
-               private boolean valid;
-
-               private MergeIterator currentSubIterator;
-
-               private static final List<Comparator<MergeIterator>> 
COMPARATORS;
-
-               static {
-                       int maxBytes = 4;
-                       COMPARATORS = new ArrayList<>(maxBytes);
-                       for (int i = 0; i < maxBytes; ++i) {
-                               final int currentBytes = i;
-                               COMPARATORS.add(new Comparator<MergeIterator>() 
{
-                                       @Override
-                                       public int compare(MergeIterator o1, 
MergeIterator o2) {
-                                               int arrayCmpRes = 
compareKeyGroupsForByteArrays(
-                                                       o1.currentKey, 
o2.currentKey, currentBytes);
-                                               return arrayCmpRes == 0 ? 
o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
-                                       }
-                               });
-                       }
-               }
-
-               RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> 
kvStateIterators, final int keyGroupPrefixByteCount) {
-                       Preconditions.checkNotNull(kvStateIterators);
-                       this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
-
-                       Comparator<MergeIterator> iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount);
-
-                       if (kvStateIterators.size() > 0) {
-                               PriorityQueue<MergeIterator> 
iteratorPriorityQueue =
-                                       new 
PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
-
-                               for (Tuple2<RocksIterator, Integer> 
rocksIteratorWithKVStateId : kvStateIterators) {
-                                       final RocksIterator rocksIterator = 
rocksIteratorWithKVStateId.f0;
-                                       rocksIterator.seekToFirst();
-                                       if (rocksIterator.isValid()) {
-                                               iteratorPriorityQueue.offer(new 
MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
-                                       } else {
-                                               
IOUtils.closeQuietly(rocksIterator);
-                                       }
-                               }
-
-                               kvStateIterators.clear();
-
-                               this.heap = iteratorPriorityQueue;
-                               this.valid = !heap.isEmpty();
-                               this.currentSubIterator = heap.poll();
-                       } else {
-                               // creating a PriorityQueue of size 0 results 
in an exception.
-                               this.heap = null;
-                               this.valid = false;
-                       }
-
-                       this.newKeyGroup = true;
-                       this.newKVState = true;
-               }
-
-               /**
-                * Advance the iterator. Should only be called if {@link 
#isValid()} returned true. Valid can only chance after
-                * calls to {@link #next()}.
-                */
-               public void next() {
-                       newKeyGroup = false;
-                       newKVState = false;
-
-                       final RocksIterator rocksIterator = 
currentSubIterator.getIterator();
-                       rocksIterator.next();
-
-                       byte[] oldKey = currentSubIterator.getCurrentKey();
-                       if (rocksIterator.isValid()) {
-                               currentSubIterator.currentKey = 
rocksIterator.key();
-
-                               if (isDifferentKeyGroup(oldKey, 
currentSubIterator.getCurrentKey())) {
-                                       heap.offer(currentSubIterator);
-                                       currentSubIterator = heap.poll();
-                                       newKVState = 
currentSubIterator.getIterator() != rocksIterator;
-                                       detectNewKeyGroup(oldKey);
-                               }
-                       } else {
-                               IOUtils.closeQuietly(rocksIterator);
-
-                               if (heap.isEmpty()) {
-                                       currentSubIterator = null;
-                                       valid = false;
-                               } else {
-                                       currentSubIterator = heap.poll();
-                                       newKVState = true;
-                                       detectNewKeyGroup(oldKey);
-                               }
-                       }
-               }
-
-               private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
-                       return 0 != compareKeyGroupsForByteArrays(a, b, 
keyGroupPrefixByteCount);
-               }
-
-               private void detectNewKeyGroup(byte[] oldKey) {
-                       if (isDifferentKeyGroup(oldKey, 
currentSubIterator.currentKey)) {
-                               newKeyGroup = true;
-                       }
-               }
-
-               /**
-                * @return key-group for the current key
-                */
-               public int keyGroup() {
-                       int result = 0;
-                       //big endian decode
-                       for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
-                               result <<= 8;
-                               result |= (currentSubIterator.currentKey[i] & 
0xFF);
-                       }
-                       return result;
-               }
-
-               public byte[] key() {
-                       return currentSubIterator.getCurrentKey();
-               }
-
-               public byte[] value() {
-                       return currentSubIterator.getIterator().value();
-               }
-
-               /**
-                * @return Id of K/V state to which the current key belongs.
-                */
-               public int kvStateId() {
-                       return currentSubIterator.getKvStateId();
-               }
-
-               /**
-                * Indicates if current key starts a new k/v-state, i.e. belong 
to a different k/v-state than it's predecessor.
-                * @return true iff the current key belong to a different 
k/v-state than it's predecessor.
-                */
-               public boolean isNewKeyValueState() {
-                       return newKVState;
-               }
-
-               /**
-                * Indicates if current key starts a new key-group, i.e. belong 
to a different key-group than it's predecessor.
-                * @return true iff the current key belong to a different 
key-group than it's predecessor.
-                */
-               public boolean isNewKeyGroup() {
-                       return newKeyGroup;
-               }
-
-               /**
-                * Check if the iterator is still valid. Getters like {@link 
#key()}, {@link #value()}, etc. as well as
-                * {@link #next()} should only be called if valid returned 
true. Should be checked after each call to
-                * {@link #next()} before accessing iterator state.
-                * @return True iff this iterator is valid.
-                */
-               public boolean isValid() {
-                       return valid;
-               }
-
-               private static int compareKeyGroupsForByteArrays(byte[] a, 
byte[] b, int len) {
-                       for (int i = 0; i < len; ++i) {
-                               int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
-                               if (diff != 0) {
-                                       return diff;
-                               }
-                       }
-                       return 0;
-               }
-
-               @Override
-               public void close() {
-                       IOUtils.closeQuietly(currentSubIterator);
-                       currentSubIterator = null;
-
-                       IOUtils.closeAllQuietly(heap);
-                       heap.clear();
-               }
-       }
-
-       /**
-        * Only visible for testing, DO NOT USE.
-        */
-       @VisibleForTesting
-       public File getInstanceBasePath() {
-               return instanceBasePath;
-       }
-
-       @Override
-       public boolean supportsAsynchronousSnapshots() {
-               return true;
-       }
-
-       @VisibleForTesting
-       @SuppressWarnings("unchecked")
-       @Override
-       public int numStateEntries() {
-               int count = 0;
-
-               for (Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>> column : 
kvStateInformation.values()) {
-                       try (RocksIterator rocksIterator = 
db.newIterator(column.f0)) {
-                               rocksIterator.seekToFirst();
-
-                               while (rocksIterator.isValid()) {
-                                       count++;
-                                       rocksIterator.next();
-                               }
-                       }
-               }
-
-               return count;
-       }
-
-       private static class RocksIteratorWrapper<K> implements Iterator<K> {
-               private final RocksIterator iterator;
-               private final String state;
-               private final TypeSerializer<K> keySerializer;
-               private final int keyGroupPrefixBytes;
-
-               public RocksIteratorWrapper(
-                               RocksIterator iterator,
-                               String state,
-                               TypeSerializer<K> keySerializer,
-                               int keyGroupPrefixBytes) {
-                       this.iterator = Preconditions.checkNotNull(iterator);
-                       this.state = Preconditions.checkNotNull(state);
-                       this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
-                       this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
-               }
-
-               @Override
-               public boolean hasNext() {
-                       return iterator.isValid();
-               }
-
-               @Override
-               public K next() {
-                       if (!hasNext()) {
-                               throw new NoSuchElementException("Failed to 
access state [" + state + "]");
-                       }
-                       try {
-                               byte[] key = iterator.key();
-                                       DataInputViewStreamWrapper dataInput = 
new DataInputViewStreamWrapper(
-                                       new ByteArrayInputStreamWithPos(key, 
keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
-                               K value = keySerializer.deserialize(dataInput);
-                               iterator.next();
-                               return value;
-                       } catch (IOException e) {
-                               throw new FlinkRuntimeException("Failed to 
access state [" + state + "]", e);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
deleted file mode 100644
index f0481ec..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.util.Preconditions;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * {@link ListState} implementation that stores state in RocksDB.
- *
- * <p>{@link RocksDBStateBackend} must ensure that we set the
- * {@link org.rocksdb.StringAppendOperator} on the column family that we use 
for our state since
- * we use the {@code merge()} call.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the values in the list state.
- */
-public class RocksDBListState<K, N, V>
-       extends AbstractRocksDBState<K, N, ListState<V>, 
ListStateDescriptor<V>, List<V>>
-       implements InternalListState<N, V> {
-
-       /** Serializer for the values. */
-       private final TypeSerializer<V> valueSerializer;
-
-       /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
-        * Separator of StringAppendTestOperator in RocksDB.
-        */
-       private static final byte DELIMITER = ',';
-
-       /**
-        * Creates a new {@code RocksDBListState}.
-        *
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                     and can create a default state value.
-        */
-       public RocksDBListState(ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       ListStateDescriptor<V> stateDesc,
-                       RocksDBKeyedStateBackend<K> backend) {
-
-               super(columnFamily, namespaceSerializer, stateDesc, backend);
-               this.valueSerializer = stateDesc.getElementSerializer();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
-       }
-
-       @Override
-       public Iterable<V> get() {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       byte[] valueBytes = backend.db.get(columnFamily, key);
-
-                       if (valueBytes == null) {
-                               return null;
-                       }
-
-                       ByteArrayInputStream bais = new 
ByteArrayInputStream(valueBytes);
-                       DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
-
-                       List<V> result = new ArrayList<>();
-                       while (in.available() > 0) {
-                               result.add(valueSerializer.deserialize(in));
-                               if (in.available() > 0) {
-                                       in.readByte();
-                               }
-                       }
-                       return result;
-               } catch (IOException | RocksDBException e) {
-                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
-               }
-       }
-
-       @Override
-       public void add(V value) throws IOException {
-               Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
-
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       keySerializationStream.reset();
-                       DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-                       valueSerializer.serialize(value, out);
-                       backend.db.merge(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-               } catch (Exception e) {
-                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
-               }
-       }
-
-       @Override
-       public void mergeNamespaces(N target, Collection<N> sources) throws 
Exception {
-               if (sources == null || sources.isEmpty()) {
-                       return;
-               }
-
-               // cache key and namespace
-               final K key = backend.getCurrentKey();
-               final int keyGroup = backend.getCurrentKeyGroupIndex();
-
-               try {
-                       // create the target full-binary-key
-                       writeKeyWithGroupAndNamespace(
-                                       keyGroup, key, target,
-                                       keySerializationStream, 
keySerializationDataOutputView);
-                       final byte[] targetKey = 
keySerializationStream.toByteArray();
-
-                       // merge the sources to the target
-                       for (N source : sources) {
-                               if (source != null) {
-                                       writeKeyWithGroupAndNamespace(
-                                                       keyGroup, key, source,
-                                                       keySerializationStream, 
keySerializationDataOutputView);
-
-                                       byte[] sourceKey = 
keySerializationStream.toByteArray();
-                                       byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
-                                       backend.db.delete(columnFamily, 
sourceKey);
-
-                                       if (valueBytes != null) {
-                                               backend.db.merge(columnFamily, 
writeOptions, targetKey, valueBytes);
-                                       }
-                               }
-                       }
-               }
-               catch (Exception e) {
-                       throw new Exception("Error while merging state in 
RocksDB", e);
-               }
-       }
-
-       @Override
-       public void update(List<V> values) throws Exception {
-               Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
-
-               clear();
-
-               if (!values.isEmpty()) {
-                       try {
-                               writeCurrentKeyWithGroupAndNamespace();
-                               byte[] key = 
keySerializationStream.toByteArray();
-
-                               byte[] premerge = getPreMergedValue(values);
-                               if (premerge != null) {
-                                       backend.db.put(columnFamily, 
writeOptions, key, premerge);
-                               } else {
-                                       throw new IOException("Failed pre-merge 
values in update()");
-                               }
-                       } catch (IOException | RocksDBException e) {
-                               throw new RuntimeException("Error while 
updating data to RocksDB", e);
-                       }
-               }
-       }
-
-       @Override
-       public void addAll(List<V> values) throws Exception {
-               Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
-
-               if (!values.isEmpty()) {
-                       try {
-                               writeCurrentKeyWithGroupAndNamespace();
-                               byte[] key = 
keySerializationStream.toByteArray();
-
-                               byte[] premerge = getPreMergedValue(values);
-                               if (premerge != null) {
-                                       backend.db.merge(columnFamily, 
writeOptions, key, premerge);
-                               } else {
-                                       throw new IOException("Failed pre-merge 
values in addAll()");
-                               }
-                       } catch (IOException | RocksDBException e) {
-                               throw new RuntimeException("Error while 
updating data to RocksDB", e);
-                       }
-               }
-       }
-
-       private byte[] getPreMergedValue(List<V> values) throws IOException {
-               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-
-               keySerializationStream.reset();
-               boolean first = true;
-               for (V value : values) {
-                       Preconditions.checkNotNull(value, "You cannot add null 
to a ListState.");
-                       if (first) {
-                               first = false;
-                       } else {
-                               keySerializationStream.write(DELIMITER);
-                       }
-                       valueSerializer.serialize(value, out);
-               }
-
-               return keySerializationStream.toByteArray();
-       }
-}

Reply via email to