http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
deleted file mode 100644
index e8c34cc..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ /dev/null
@@ -1,532 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.MapState;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
-import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.util.Preconditions;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * {@link MapState} implementation that stores state in RocksDB.
- *
- * <p>{@link RocksDBStateBackend} must ensure that we set the
- * {@link org.rocksdb.StringAppendOperator} on the column family that we use 
for our state since
- * we use the {@code merge()} call.
- *
- * @param <K>  The type of the key.
- * @param <N>  The type of the namespace.
- * @param <UK> The type of the keys in the map state.
- * @param <UV> The type of the values in the map state.
- */
-public class RocksDBMapState<K, N, UK, UV>
-       extends AbstractRocksDBState<K, N, MapState<UK, UV>, 
MapStateDescriptor<UK, UV>, Map<UK, UV>>
-       implements InternalMapState<N, UK, UV> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBMapState.class);
-
-       /** Serializer for the keys and values. */
-       private final TypeSerializer<UK> userKeySerializer;
-       private final TypeSerializer<UV> userValueSerializer;
-
-       /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
-        * Creates a new {@code RocksDBMapState}.
-        *
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state.
-        */
-       public RocksDBMapState(ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       MapStateDescriptor<UK, UV> stateDesc,
-                       RocksDBKeyedStateBackend<K> backend) {
-
-               super(columnFamily, namespaceSerializer, stateDesc, backend);
-
-               this.userKeySerializer = stateDesc.getKeySerializer();
-               this.userValueSerializer = stateDesc.getValueSerializer();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  MapState Implementation
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public UV get(UK userKey) throws IOException, RocksDBException {
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-               byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
-
-               return (rawValueBytes == null ? null : 
deserializeUserValue(rawValueBytes));
-       }
-
-       @Override
-       public void put(UK userKey, UV userValue) throws IOException, 
RocksDBException {
-
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-               byte[] rawValueBytes = serializeUserValue(userValue);
-
-               backend.db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
-       }
-
-       @Override
-       public void putAll(Map<UK, UV> map) throws IOException, 
RocksDBException {
-               if (map == null) {
-                       return;
-               }
-
-               for (Map.Entry<UK, UV> entry : map.entrySet()) {
-                       put(entry.getKey(), entry.getValue());
-               }
-       }
-
-       @Override
-       public void remove(UK userKey) throws IOException, RocksDBException {
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-
-               backend.db.remove(columnFamily, writeOptions, rawKeyBytes);
-       }
-
-       @Override
-       public boolean contains(UK userKey) throws IOException, 
RocksDBException {
-               byte[] rawKeyBytes = 
serializeUserKeyWithCurrentKeyAndNamespace(userKey);
-               byte[] rawValueBytes = backend.db.get(columnFamily, 
rawKeyBytes);
-
-               return (rawValueBytes != null);
-       }
-
-       @Override
-       public Iterable<Map.Entry<UK, UV>> entries() throws IOException, 
RocksDBException {
-               final Iterator<Map.Entry<UK, UV>> iterator = iterator();
-
-               // Return null to make the behavior consistent with other 
states.
-               if (!iterator.hasNext()) {
-                       return null;
-               } else {
-                       return new Iterable<Map.Entry<UK, UV>>() {
-                               @Override
-                               public Iterator<Map.Entry<UK, UV>> iterator() {
-                                       return iterator;
-                               }
-                       };
-               }
-       }
-
-       @Override
-       public Iterable<UK> keys() throws IOException, RocksDBException {
-               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-
-               return new Iterable<UK>() {
-                       @Override
-                       public Iterator<UK> iterator() {
-                               return new RocksDBMapIterator<UK>(backend.db, 
prefixBytes) {
-                                       @Override
-                                       public UK next() {
-                                               RocksDBMapEntry entry = 
nextEntry();
-                                               return (entry == null ? null : 
entry.getKey());
-                                       }
-                               };
-                       }
-               };
-       }
-
-       @Override
-       public Iterable<UV> values() throws IOException, RocksDBException {
-               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-
-               return new Iterable<UV>() {
-                       @Override
-                       public Iterator<UV> iterator() {
-                               return new RocksDBMapIterator<UV>(backend.db, 
prefixBytes) {
-                                       @Override
-                                       public UV next() {
-                                               RocksDBMapEntry entry = 
nextEntry();
-                                               return (entry == null ? null : 
entry.getValue());
-                                       }
-                               };
-                       }
-               };
-       }
-
-       @Override
-       public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, 
RocksDBException {
-               final byte[] prefixBytes = serializeCurrentKeyAndNamespace();
-
-               return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, 
prefixBytes) {
-                       @Override
-                       public Map.Entry<UK, UV> next() {
-                               return nextEntry();
-                       }
-               };
-       }
-
-       @Override
-       public void clear() {
-               try {
-                       Iterator<Map.Entry<UK, UV>> iterator = iterator();
-
-                       while (iterator.hasNext()) {
-                               iterator.next();
-                               iterator.remove();
-                       }
-               } catch (Exception e) {
-                       LOG.warn("Error while cleaning the state.", e);
-               }
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) 
throws Exception {
-               Preconditions.checkNotNull(serializedKeyAndNamespace, 
"Serialized key and namespace");
-
-               //TODO make KvStateSerializer key-group aware to save this 
round trip and key-group computation
-               Tuple2<K, N> des = KvStateSerializer.deserializeKeyAndNamespace(
-                               serializedKeyAndNamespace,
-                               backend.getKeySerializer(),
-                               namespaceSerializer);
-
-               int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
-
-               ByteArrayOutputStreamWithPos outputStream = new 
ByteArrayOutputStreamWithPos(128);
-               DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(outputStream);
-               writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, 
outputStream, outputView);
-               final byte[] keyPrefixBytes = outputStream.toByteArray();
-
-               final Iterator<Map.Entry<UK, UV>> iterator = new 
RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) {
-                       @Override
-                       public Map.Entry<UK, UV> next() {
-                               return nextEntry();
-                       }
-               };
-
-               // Return null to make the behavior consistent with other 
backends
-               if (!iterator.hasNext()) {
-                       return null;
-               }
-
-               return KvStateSerializer.serializeMap(new 
Iterable<Map.Entry<UK, UV>>() {
-                       @Override
-                       public Iterator<Map.Entry<UK, UV>> iterator() {
-                               return iterator;
-                       }
-               }, userKeySerializer, userValueSerializer);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Serialization Methods
-       // 
------------------------------------------------------------------------
-
-       private byte[] serializeCurrentKeyAndNamespace() throws IOException {
-               writeCurrentKeyWithGroupAndNamespace();
-
-               return keySerializationStream.toByteArray();
-       }
-
-       private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) 
throws IOException {
-               writeCurrentKeyWithGroupAndNamespace();
-               userKeySerializer.serialize(userKey, 
keySerializationDataOutputView);
-
-               return keySerializationStream.toByteArray();
-       }
-
-       private byte[] serializeUserValue(UV userValue) throws IOException {
-               keySerializationStream.reset();
-
-               if (userValue == null) {
-                       keySerializationDataOutputView.writeBoolean(true);
-               } else {
-                       keySerializationDataOutputView.writeBoolean(false);
-                       userValueSerializer.serialize(userValue, 
keySerializationDataOutputView);
-               }
-
-               return keySerializationStream.toByteArray();
-       }
-
-       private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException {
-               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawKeyBytes);
-               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
-
-               readKeyWithGroupAndNamespace(bais, in);
-
-               return userKeySerializer.deserialize(in);
-       }
-
-       private UV deserializeUserValue(byte[] rawValueBytes) throws 
IOException {
-               ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawValueBytes);
-               DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
-
-               boolean isNull = in.readBoolean();
-
-               return isNull ? null : userValueSerializer.deserialize(in);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Internal Classes
-       // 
------------------------------------------------------------------------
-
-       /** A map entry in RocksDBMapState. */
-       private class RocksDBMapEntry implements Map.Entry<UK, UV> {
-               private final RocksDB db;
-
-               /** The raw bytes of the key stored in RocksDB. Each user key 
is stored in RocksDB
-                * with the format #KeyGroup#Key#Namespace#UserKey. */
-               private final byte[] rawKeyBytes;
-
-               /** The raw bytes of the value stored in RocksDB. */
-               private byte[] rawValueBytes;
-
-               /** True if the entry has been deleted. */
-               private boolean deleted;
-
-               /** The user key and value. The deserialization is performed 
lazily, i.e. the key
-                * and the value is deserialized only when they are accessed. */
-               private UK userKey = null;
-               private UV userValue = null;
-
-               RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, 
final byte[] rawValueBytes) {
-                       this.db = db;
-
-                       this.rawKeyBytes = rawKeyBytes;
-                       this.rawValueBytes = rawValueBytes;
-                       this.deleted = false;
-               }
-
-               public void remove() {
-                       deleted = true;
-                       rawValueBytes = null;
-
-                       try {
-                               db.remove(columnFamily, writeOptions, 
rawKeyBytes);
-                       } catch (RocksDBException e) {
-                               throw new RuntimeException("Error while 
removing data from RocksDB.", e);
-                       }
-               }
-
-               @Override
-               public UK getKey() {
-                       if (userKey == null) {
-                               try {
-                                       userKey = 
deserializeUserKey(rawKeyBytes);
-                               } catch (IOException e) {
-                                       throw new RuntimeException("Error while 
deserializing the user key.");
-                               }
-                       }
-
-                       return userKey;
-               }
-
-               @Override
-               public UV getValue() {
-                       if (deleted) {
-                               return null;
-                       } else {
-                               if (userValue == null) {
-                                       try {
-                                               userValue = 
deserializeUserValue(rawValueBytes);
-                                       } catch (IOException e) {
-                                               throw new 
RuntimeException("Error while deserializing the user value.");
-                                       }
-                               }
-
-                               return userValue;
-                       }
-               }
-
-               @Override
-               public UV setValue(UV value) {
-                       if (deleted) {
-                               throw new IllegalStateException("The value has 
already been deleted.");
-                       }
-
-                       UV oldValue = getValue();
-
-                       try {
-                               userValue = value;
-                               rawValueBytes = serializeUserValue(value);
-
-                               db.put(columnFamily, writeOptions, rawKeyBytes, 
rawValueBytes);
-                       } catch (IOException | RocksDBException e) {
-                               throw new RuntimeException("Error while putting 
data into RocksDB.", e);
-                       }
-
-                       return oldValue;
-               }
-       }
-
-       /** An auxiliary utility to scan all entries under the given key. */
-       private abstract class RocksDBMapIterator<T> implements Iterator<T> {
-
-               static final int CACHE_SIZE_BASE = 1;
-               static final int CACHE_SIZE_LIMIT = 128;
-
-               /** The db where data resides. */
-               private final RocksDB db;
-
-               /**
-                * The prefix bytes of the key being accessed. All entries 
under the same key
-                * has the same prefix, hence we can stop the iterating once 
coming across an
-                * entry with a different prefix.
-                */
-               private final byte[] keyPrefixBytes;
-
-               /**
-                * True if all entries have been accessed or the iterator has 
come across an
-                * entry with a different prefix.
-                */
-               private boolean expired = false;
-
-               /** A in-memory cache for the entries in the rocksdb. */
-               private ArrayList<RocksDBMapEntry> cacheEntries = new 
ArrayList<>();
-               private int cacheIndex = 0;
-
-               RocksDBMapIterator(final RocksDB db, final byte[] 
keyPrefixBytes) {
-                       this.db = db;
-                       this.keyPrefixBytes = keyPrefixBytes;
-               }
-
-               @Override
-               public boolean hasNext() {
-                       loadCache();
-
-                       return (cacheIndex < cacheEntries.size());
-               }
-
-               @Override
-               public void remove() {
-                       if (cacheIndex == 0 || cacheIndex > 
cacheEntries.size()) {
-                               throw new IllegalStateException("The remove 
operation must be called after an valid next operation.");
-                       }
-
-                       RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex 
- 1);
-                       lastEntry.remove();
-               }
-
-               final RocksDBMapEntry nextEntry() {
-                       loadCache();
-
-                       if (cacheIndex == cacheEntries.size()) {
-                               if (!expired) {
-                                       throw new IllegalStateException();
-                               }
-
-                               return null;
-                       }
-
-                       RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
-                       cacheIndex++;
-
-                       return entry;
-               }
-
-               private void loadCache() {
-                       if (cacheIndex > cacheEntries.size()) {
-                               throw new IllegalStateException();
-                       }
-
-                       // Load cache entries only when the cache is empty and 
there still exist unread entries
-                       if (cacheIndex < cacheEntries.size() || expired) {
-                               return;
-                       }
-
-                       RocksIterator iterator = db.newIterator(columnFamily);
-
-                       /*
-                        * The iteration starts from the prefix bytes at the 
first loading. The cache then is
-                        * reloaded when the next entry to return is the last 
one in the cache. At that time,
-                        * we will start the iterating from the last returned 
entry.
-                        */
-                       RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? 
null : cacheEntries.get(cacheEntries.size() - 1);
-                       byte[] startBytes = (lastEntry == null ? keyPrefixBytes 
: lastEntry.rawKeyBytes);
-                       int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : 
Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT));
-
-                       cacheEntries.clear();
-                       cacheIndex = 0;
-
-                       iterator.seek(startBytes);
-
-                       /*
-                        * If the last returned entry is not deleted, it will 
be the first entry in the
-                        * iterating. Skip it to avoid redundant access in such 
cases.
-                        */
-                       if (lastEntry != null && !lastEntry.deleted) {
-                               iterator.next();
-                       }
-
-                       while (true) {
-                               if (!iterator.isValid() || 
!underSameKey(iterator.key())) {
-                                       expired = true;
-                                       break;
-                               }
-
-                               if (cacheEntries.size() >= numEntries) {
-                                       break;
-                               }
-
-                               RocksDBMapEntry entry = new RocksDBMapEntry(db, 
iterator.key(), iterator.value());
-                               cacheEntries.add(entry);
-
-                               iterator.next();
-                       }
-
-                       iterator.close();
-               }
-
-               private boolean underSameKey(byte[] rawKeyBytes) {
-                       if (rawKeyBytes.length < keyPrefixBytes.length) {
-                               return false;
-                       }
-
-                       for (int i = 0; i < keyPrefixBytes.length; ++i) {
-                               if (rawKeyBytes[i] != keyPrefixBytes[i]) {
-                                       return false;
-                               }
-                       }
-
-                       return true;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
deleted file mode 100644
index b4c3f51..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.Collection;
-
-/**
- * {@link ReducingState} implementation that stores state in RocksDB.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of value that the state state stores.
- */
-public class RocksDBReducingState<K, N, V>
-       extends AbstractRocksDBState<K, N, ReducingState<V>, 
ReducingStateDescriptor<V>, V>
-       implements InternalReducingState<N, V> {
-
-       /** Serializer for the values. */
-       private final TypeSerializer<V> valueSerializer;
-
-       /** User-specified reduce function. */
-       private final ReduceFunction<V> reduceFunction;
-
-       /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
-        * Creates a new {@code RocksDBReducingState}.
-        *
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                     and can create a default state value.
-        */
-       public RocksDBReducingState(ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       ReducingStateDescriptor<V> stateDesc,
-                       RocksDBKeyedStateBackend<K> backend) {
-
-               super(columnFamily, namespaceSerializer, stateDesc, backend);
-               this.valueSerializer = stateDesc.getSerializer();
-               this.reduceFunction = stateDesc.getReduceFunction();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
-       }
-
-       @Override
-       public V get() {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       byte[] valueBytes = backend.db.get(columnFamily, key);
-                       if (valueBytes == null) {
-                               return null;
-                       }
-                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
-               } catch (IOException | RocksDBException e) {
-                       throw new RuntimeException("Error while retrieving data 
from RocksDB", e);
-               }
-       }
-
-       @Override
-       public void add(V value) throws IOException {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       byte[] valueBytes = backend.db.get(columnFamily, key);
-
-                       DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-                       if (valueBytes == null) {
-                               keySerializationStream.reset();
-                               valueSerializer.serialize(value, out);
-                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-                       } else {
-                               V oldValue = valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
-                               V newValue = reduceFunction.reduce(oldValue, 
value);
-                               keySerializationStream.reset();
-                               valueSerializer.serialize(newValue, out);
-                               backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-                       }
-               } catch (Exception e) {
-                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
-               }
-       }
-
-       @Override
-       public void mergeNamespaces(N target, Collection<N> sources) throws 
Exception {
-               if (sources == null || sources.isEmpty()) {
-                       return;
-               }
-
-               // cache key and namespace
-               final K key = backend.getCurrentKey();
-               final int keyGroup = backend.getCurrentKeyGroupIndex();
-
-               try {
-                       V current = null;
-
-                       // merge the sources to the target
-                       for (N source : sources) {
-                               if (source != null) {
-
-                                       writeKeyWithGroupAndNamespace(
-                                                       keyGroup, key, source,
-                                                       keySerializationStream, 
keySerializationDataOutputView);
-
-                                       final byte[] sourceKey = 
keySerializationStream.toByteArray();
-                                       final byte[] valueBytes = 
backend.db.get(columnFamily, sourceKey);
-                                       backend.db.delete(columnFamily, 
sourceKey);
-
-                                       if (valueBytes != null) {
-                                               V value = 
valueSerializer.deserialize(
-                                                               new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
-
-                                               if (current != null) {
-                                                       current = 
reduceFunction.reduce(current, value);
-                                               }
-                                               else {
-                                                       current = value;
-                                               }
-                                       }
-                               }
-                       }
-
-                       // if something came out of merging the sources, merge 
it or write it to the target
-                       if (current != null) {
-                               // create the target full-binary-key
-                               writeKeyWithGroupAndNamespace(
-                                               keyGroup, key, target,
-                                               keySerializationStream, 
keySerializationDataOutputView);
-
-                               final byte[] targetKey = 
keySerializationStream.toByteArray();
-                               final byte[] targetValueBytes = 
backend.db.get(columnFamily, targetKey);
-
-                               if (targetValueBytes != null) {
-                                       // target also had a value, merge
-                                       V value = valueSerializer.deserialize(
-                                                       new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes)));
-
-                                       current = 
reduceFunction.reduce(current, value);
-                               }
-
-                               // serialize the resulting value
-                               keySerializationStream.reset();
-                               valueSerializer.serialize(current, 
keySerializationDataOutputView);
-
-                               // write the resulting value
-                               backend.db.put(columnFamily, writeOptions, 
targetKey, keySerializationStream.toByteArray());
-                       }
-               }
-               catch (Exception e) {
-                       throw new Exception("Error while merging state in 
RocksDB", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
deleted file mode 100644
index b7e4794..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ /dev/null
@@ -1,691 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointStorage;
-import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
-import org.apache.flink.runtime.state.ConfigurableStateBackend;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.util.AbstractID;
-
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-import org.rocksdb.NativeLibraryLoader;
-import org.rocksdb.RocksDB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A State Backend that stores its state in {@code RocksDB}. This state 
backend can
- * store very large state that exceeds memory and spills to disk.
- *
- * <p>All key/value state (including windows) is stored in the key/value index 
of RocksDB.
- * For persistence against loss of machines, checkpoints take a snapshot of the
- * RocksDB database, and persist that snapshot in a file system (by default) or
- * another configurable state backend.
- *
- * <p>The behavior of the RocksDB instances can be parametrized by setting 
RocksDB Options
- * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
- * {@link #setOptions(OptionsFactory)}.
- */
-public class RocksDBStateBackend extends AbstractStateBackend implements 
ConfigurableStateBackend {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBStateBackend.class);
-
-       /** The number of (re)tries for loading the RocksDB JNI library. */
-       private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3;
-
-       private static boolean rocksDbInitialized = false;
-
-       // 
------------------------------------------------------------------------
-
-       // -- configuration values, set in the application / configuration
-
-       /** The state backend that we use for creating checkpoint streams. */
-       private final StateBackend checkpointStreamBackend;
-
-       /** Base paths for RocksDB directory, as configured.
-        * Null if not yet set, in which case the configuration values will be 
used.
-        * The configuration defaults to the TaskManager's temp directories. */
-       @Nullable
-       private Path[] localRocksDbDirectories;
-
-       /** The pre-configured option settings. */
-       private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
-
-       /** The options factory to create the RocksDB options in the cluster. */
-       @Nullable
-       private OptionsFactory optionsFactory;
-
-       /** True if incremental checkpointing is enabled.
-        * Null if not yet set, in which case the configuration values will be 
used. */
-       @Nullable
-       private Boolean enableIncrementalCheckpointing;
-
-       // -- runtime values, set on TaskManager when initializing / using the 
backend
-
-       /** Base paths for RocksDB directory, as initialized. */
-       private transient File[] initializedDbBasePaths;
-
-       /** JobID for uniquifying backup paths. */
-       private transient JobID jobId;
-
-       /** The index of the next directory to be used from {@link 
#initializedDbBasePaths}.*/
-       private transient int nextDirectory;
-
-       /** Whether we already lazily initialized our local storage 
directories. */
-       private transient boolean isInitialized;
-
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
-        * file system and location defined by the given URI.
-        *
-        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
-        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
-        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
-        * config, or included in the classpath.
-        *
-        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
-        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
-        */
-       public RocksDBStateBackend(String checkpointDataUri) throws IOException 
{
-               this(new Path(checkpointDataUri).toUri());
-       }
-
-       /**
-        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
-        * file system and location defined by the given URI.
-        *
-        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
-        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
-        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
-        * config, or included in the classpath.
-        *
-        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
-        * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
-        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
-        */
-       public RocksDBStateBackend(String checkpointDataUri, boolean 
enableIncrementalCheckpointing) throws IOException {
-               this(new Path(checkpointDataUri).toUri(), 
enableIncrementalCheckpointing);
-       }
-
-       /**
-        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
-        * file system and location defined by the given URI.
-        *
-        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
-        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
-        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
-        * config, or included in the classpath.
-        *
-        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
-        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
-        */
-       public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
-               this(new FsStateBackend(checkpointDataUri));
-       }
-
-       /**
-        * Creates a new {@code RocksDBStateBackend} that stores its checkpoint 
data in the
-        * file system and location defined by the given URI.
-        *
-        * <p>A state backend that stores checkpoints in HDFS or S3 must 
specify the file system
-        * host and port in the URI, or have the Hadoop configuration that 
describes the file system
-        * (host / high-availability group / possibly credentials) either 
referenced from the Flink
-        * config, or included in the classpath.
-        *
-        * @param checkpointDataUri The URI describing the filesystem and path 
to the checkpoint data directory.
-        * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
-        * @throws IOException Thrown, if no file system can be found for the 
scheme in the URI.
-        */
-       public RocksDBStateBackend(URI checkpointDataUri, boolean 
enableIncrementalCheckpointing) throws IOException {
-               this(new FsStateBackend(checkpointDataUri), 
enableIncrementalCheckpointing);
-       }
-
-       /**
-        * Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
-        * checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
-        * here where the snapshots from RocksDB would be stored.
-        *
-        * <p>The snapshots of the RocksDB state will be stored using the given 
backend's
-        * {@link StateBackend#createCheckpointStorage(JobID)}.
-        *
-        * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
-        */
-       public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
-               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
-       }
-
-       /**
-        * Creates a new {@code RocksDBStateBackend} that uses the given state 
backend to store its
-        * checkpoint data streams. Typically, one would supply a filesystem or 
database state backend
-        * here where the snapshots from RocksDB would be stored.
-        *
-        * <p>The snapshots of the RocksDB state will be stored using the given 
backend's
-        * {@link StateBackend#createCheckpointStorage(JobID)}.
-        *
-        * @param checkpointStreamBackend The backend write the checkpoint 
streams to.
-        * @param enableIncrementalCheckpointing True if incremental 
checkpointing is enabled.
-        */
-       public RocksDBStateBackend(StateBackend checkpointStreamBackend, 
boolean enableIncrementalCheckpointing) {
-               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
-               this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
-       }
-
-       /**
-        * @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead.
-        */
-       @Deprecated
-       public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend) {
-               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
-       }
-
-       /**
-        * @deprecated Use {@link #RocksDBStateBackend(StateBackend, boolean)} 
instead.
-        */
-       @Deprecated
-       public RocksDBStateBackend(AbstractStateBackend 
checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
-               this.checkpointStreamBackend = 
checkNotNull(checkpointStreamBackend);
-               this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
-       }
-
-       /**
-        * Private constructor that creates a re-configured copy of the state 
backend.
-        *
-        * @param original The state backend to re-configure.
-        * @param config The configuration.
-        */
-       private RocksDBStateBackend(RocksDBStateBackend original, Configuration 
config) {
-               // reconfigure the state backend backing the streams
-               final StateBackend originalStreamBackend = 
original.checkpointStreamBackend;
-               this.checkpointStreamBackend = originalStreamBackend instanceof 
ConfigurableStateBackend ?
-                               ((ConfigurableStateBackend) 
originalStreamBackend).configure(config) :
-                               originalStreamBackend;
-
-               // configure incremental checkpoints
-               if (original.enableIncrementalCheckpointing != null) {
-                       this.enableIncrementalCheckpointing = 
original.enableIncrementalCheckpointing;
-               }
-               else {
-                       this.enableIncrementalCheckpointing =
-                                       
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
-               }
-
-               // configure local directories
-               if (original.localRocksDbDirectories != null) {
-                       this.localRocksDbDirectories = 
original.localRocksDbDirectories;
-               }
-               else {
-                       final String rocksdbLocalPaths = 
config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
-                       if (rocksdbLocalPaths != null) {
-                               String[] directories = 
rocksdbLocalPaths.split(",|" + File.pathSeparator);
-
-                               try {
-                                       setDbStoragePaths(directories);
-                               }
-                               catch (IllegalArgumentException e) {
-                                       throw new 
IllegalConfigurationException("Invalid configuration for RocksDB state " +
-                                                       "backend's local 
storage directories: " + e.getMessage(), e);
-                               }
-                       }
-               }
-
-               // copy remaining settings
-               this.predefinedOptions = original.predefinedOptions;
-               this.optionsFactory = original.optionsFactory;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Reconfiguration
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Creates a copy of this state backend that uses the values defined in 
the configuration
-        * for fields where that were not yet specified in this state backend.
-        *
-        * @param config the configuration
-        * @return The re-configured variant of the state backend
-        */
-       @Override
-       public RocksDBStateBackend configure(Configuration config) {
-               return new RocksDBStateBackend(this, config);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  State backend methods
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the state backend that this RocksDB state backend uses to 
persist
-        * its bytes to.
-        *
-        * <p>This RocksDB state backend only implements the RocksDB specific 
parts, it
-        * relies on the 'CheckpointBackend' to persist the checkpoint and 
savepoint bytes
-        * streams.
-        */
-       public StateBackend getCheckpointBackend() {
-               return checkpointStreamBackend;
-       }
-
-       private void lazyInitializeForJob(
-                       Environment env,
-                       @SuppressWarnings("unused") String operatorIdentifier) 
throws IOException {
-
-               if (isInitialized) {
-                       return;
-               }
-
-               this.jobId = env.getJobID();
-
-               // initialize the paths where the local RocksDB files should be 
stored
-               if (localRocksDbDirectories == null) {
-                       // initialize from the temp directories
-                       initializedDbBasePaths = 
env.getIOManager().getSpillingDirectories();
-               }
-               else {
-                       List<File> dirs = new 
ArrayList<>(localRocksDbDirectories.length);
-                       String errorMessage = "";
-
-                       for (Path path : localRocksDbDirectories) {
-                               File f = new File(path.toUri().getPath());
-                               File testDir = new File(f, 
UUID.randomUUID().toString());
-                               if (!testDir.mkdirs()) {
-                                       String msg = "Local DB files directory 
'" + path
-                                                       + "' does not exist and 
cannot be created. ";
-                                       LOG.error(msg);
-                                       errorMessage += msg;
-                               } else {
-                                       dirs.add(f);
-                               }
-                               //noinspection ResultOfMethodCallIgnored
-                               testDir.delete();
-                       }
-
-                       if (dirs.isEmpty()) {
-                               throw new IOException("No local storage 
directories available. " + errorMessage);
-                       } else {
-                               initializedDbBasePaths = dirs.toArray(new 
File[dirs.size()]);
-                       }
-               }
-
-               nextDirectory = new 
Random().nextInt(initializedDbBasePaths.length);
-
-               isInitialized = true;
-       }
-
-       private File getNextStoragePath() {
-               int ni = nextDirectory + 1;
-               ni = ni >= initializedDbBasePaths.length ? 0 : ni;
-               nextDirectory = ni;
-
-               return initializedDbBasePaths[ni];
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Checkpoint initialization and persistent storage
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public CompletedCheckpointStorageLocation resolveCheckpoint(String 
pointer) throws IOException {
-               return checkpointStreamBackend.resolveCheckpoint(pointer);
-       }
-
-       @Override
-       public CheckpointStorage createCheckpointStorage(JobID jobId) throws 
IOException {
-               return checkpointStreamBackend.createCheckpointStorage(jobId);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  State holding data structures
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
-                       Environment env,
-                       JobID jobID,
-                       String operatorIdentifier,
-                       TypeSerializer<K> keySerializer,
-                       int numberOfKeyGroups,
-                       KeyGroupRange keyGroupRange,
-                       TaskKvStateRegistry kvStateRegistry) throws IOException 
{
-
-               // first, make sure that the RocksDB JNI library is loaded
-               // we do this explicitly here to have better error handling
-               String tempDir = 
env.getTaskManagerInfo().getTmpDirectories()[0];
-               ensureRocksDBIsLoaded(tempDir);
-
-               lazyInitializeForJob(env, operatorIdentifier);
-
-               File instanceBasePath =
-                               new File(getNextStoragePath(), "job-" + jobId + 
"_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID());
-
-               return new RocksDBKeyedStateBackend<>(
-                               operatorIdentifier,
-                               env.getUserClassLoader(),
-                               instanceBasePath,
-                               getDbOptions(),
-                               getColumnOptions(),
-                               kvStateRegistry,
-                               keySerializer,
-                               numberOfKeyGroups,
-                               keyGroupRange,
-                               env.getExecutionConfig(),
-                               isIncrementalCheckpointsEnabled());
-       }
-
-       @Override
-       public OperatorStateBackend createOperatorStateBackend(
-                       Environment env,
-                       String operatorIdentifier) throws Exception {
-
-               //the default for RocksDB; eventually there can be a operator 
state backend based on RocksDB, too.
-               final boolean asyncSnapshots = true;
-               return new DefaultOperatorStateBackend(
-                               env.getUserClassLoader(),
-                               env.getExecutionConfig(),
-                               asyncSnapshots);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Parameters
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Sets the path where the RocksDB local database files should be 
stored on the local
-        * file system. Setting this path overrides the default behavior, where 
the
-        * files are stored across the configured temp directories.
-        *
-        * <p>Passing {@code null} to this function restores the default 
behavior, where the configured
-        * temp directories will be used.
-        *
-        * @param path The path where the local RocksDB database files are 
stored.
-        */
-       public void setDbStoragePath(String path) {
-               setDbStoragePaths(path == null ? null : new String[] { path });
-       }
-
-       /**
-        * Sets the paths across which the local RocksDB database files are 
distributed on the local
-        * file system. Setting these paths overrides the default behavior, 
where the
-        * files are stored across the configured temp directories.
-        *
-        * <p>Each distinct state will be stored in one path, but when the 
state backend creates
-        * multiple states, they will store their files on different paths.
-        *
-        * <p>Passing {@code null} to this function restores the default 
behavior, where the configured
-        * temp directories will be used.
-        *
-        * @param paths The paths across which the local RocksDB database files 
will be spread.
-        */
-       public void setDbStoragePaths(String... paths) {
-               if (paths == null) {
-                       localRocksDbDirectories = null;
-               }
-               else if (paths.length == 0) {
-                       throw new IllegalArgumentException("empty paths");
-               }
-               else {
-                       Path[] pp = new Path[paths.length];
-
-                       for (int i = 0; i < paths.length; i++) {
-                               if (paths[i] == null) {
-                                       throw new 
IllegalArgumentException("null path");
-                               }
-
-                               pp[i] = new Path(paths[i]);
-                               String scheme = pp[i].toUri().getScheme();
-                               if (scheme != null && 
!scheme.equalsIgnoreCase("file")) {
-                                       throw new 
IllegalArgumentException("Path " + paths[i] + " has a non local scheme");
-                               }
-                       }
-
-                       localRocksDbDirectories = pp;
-               }
-       }
-
-       /**
-        *
-        * @return The configured DB storage paths, or null, if none were 
configured.
-        */
-       public String[] getDbStoragePaths() {
-               if (localRocksDbDirectories == null) {
-                       return null;
-               } else {
-                       String[] paths = new 
String[localRocksDbDirectories.length];
-                       for (int i = 0; i < paths.length; i++) {
-                               paths[i] = 
localRocksDbDirectories[i].toString();
-                       }
-                       return paths;
-               }
-       }
-
-       /**
-        * Gets whether incremental checkpoints are enabled for this state 
backend.
-        */
-       public boolean isIncrementalCheckpointsEnabled() {
-               if (enableIncrementalCheckpointing != null) {
-                       return enableIncrementalCheckpointing;
-               }
-               else {
-                       return 
CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Parametrize with RocksDB Options
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Sets the predefined options for RocksDB.
-        *
-        * <p>If a user-defined options factory is set (via {@link 
#setOptions(OptionsFactory)}),
-        * then the options from the factory are applied on top of the here 
specified
-        * predefined options.
-        *
-        * @param options The options to set (must not be null).
-        */
-       public void setPredefinedOptions(PredefinedOptions options) {
-               predefinedOptions = checkNotNull(options);
-       }
-
-       /**
-        * Gets the currently set predefined options for RocksDB.
-        * The default options (if nothing was set via {@link 
#setPredefinedOptions(PredefinedOptions)})
-        * are {@link PredefinedOptions#DEFAULT}.
-        *
-        * <p>If a user-defined  options factory is set (via {@link 
#setOptions(OptionsFactory)}),
-        * then the options from the factory are applied on top of the 
predefined options.
-        *
-        * @return The currently set predefined options for RocksDB.
-        */
-       public PredefinedOptions getPredefinedOptions() {
-               return predefinedOptions;
-       }
-
-       /**
-        * Sets {@link org.rocksdb.Options} for the RocksDB instances.
-        * Because the options are not serializable and hold native code 
references,
-        * they must be specified through a factory.
-        *
-        * <p>The options created by the factory here are applied on top of the 
pre-defined
-        * options profile selected via {@link 
#setPredefinedOptions(PredefinedOptions)}.
-        * If the pre-defined options profile is the default
-        * ({@link PredefinedOptions#DEFAULT}), then the factory fully controls 
the RocksDB
-        * options.
-        *
-        * @param optionsFactory The options factory that lazily creates the 
RocksDB options.
-        */
-       public void setOptions(OptionsFactory optionsFactory) {
-               this.optionsFactory = optionsFactory;
-       }
-
-       /**
-        * Gets the options factory that lazily creates the RocksDB options.
-        *
-        * @return The options factory.
-        */
-       public OptionsFactory getOptions() {
-               return optionsFactory;
-       }
-
-       /**
-        * Gets the RocksDB {@link DBOptions} to be used for all RocksDB 
instances.
-        */
-       public DBOptions getDbOptions() {
-               // initial options from pre-defined profile
-               DBOptions opt = predefinedOptions.createDBOptions();
-
-               // add user-defined options, if specified
-               if (optionsFactory != null) {
-                       opt = optionsFactory.createDBOptions(opt);
-               }
-
-               // add necessary default options
-               opt = opt.setCreateIfMissing(true);
-
-               return opt;
-       }
-
-       /**
-        * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all 
RocksDB instances.
-        */
-       public ColumnFamilyOptions getColumnOptions() {
-               // initial options from pre-defined profile
-               ColumnFamilyOptions opt = 
predefinedOptions.createColumnOptions();
-
-               // add user-defined options, if specified
-               if (optionsFactory != null) {
-                       opt = optionsFactory.createColumnOptions(opt);
-               }
-
-               return opt;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  utilities
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return "RocksDBStateBackend{" +
-                               "checkpointStreamBackend=" + 
checkpointStreamBackend +
-                               ", localRocksDbDirectories=" + 
Arrays.toString(localRocksDbDirectories) +
-                               ", enableIncrementalCheckpointing=" + 
enableIncrementalCheckpointing +
-                               '}';
-       }
-
-       // 
------------------------------------------------------------------------
-       //  static library loading utilities
-       // 
------------------------------------------------------------------------
-
-       private void ensureRocksDBIsLoaded(String tempDirectory) throws 
IOException {
-               synchronized (RocksDBStateBackend.class) {
-                       if (!rocksDbInitialized) {
-
-                               final File tempDirParent = new 
File(tempDirectory).getAbsoluteFile();
-                               LOG.info("Attempting to load RocksDB native 
library and store it under '{}'", tempDirParent);
-
-                               Throwable lastException = null;
-                               for (int attempt = 1; attempt <= 
ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) {
-                                       try {
-                                               // when multiple instances of 
this class and RocksDB exist in different
-                                               // class loaders, then we can 
see the following exception:
-                                               // 
"java.lang.UnsatisfiedLinkError: Native Library 
/path/to/temp/dir/librocksdbjni-linux64.so
-                                               // already loaded in another 
class loader"
-
-                                               // to avoid that, we need to 
add a random element to the library file path
-                                               // (I know, seems like an 
unnecessary hack, since the JVM obviously can handle multiple
-                                               //  instances of the same JNI 
library being loaded in different class loaders, but
-                                               //  apparently not when coming 
from the same file path, so there we go)
-
-                                               final File rocksLibFolder = new 
File(tempDirParent, "rocksdb-lib-" + new AbstractID());
-
-                                               // make sure the temp path 
exists
-                                               LOG.debug("Attempting to create 
RocksDB native library folder {}", rocksLibFolder);
-                                               // noinspection 
ResultOfMethodCallIgnored
-                                               rocksLibFolder.mkdirs();
-
-                                               // explicitly load the JNI 
dependency if it has not been loaded before
-                                               
NativeLibraryLoader.getInstance().loadLibrary(rocksLibFolder.getAbsolutePath());
-
-                                               // this initialization here 
should validate that the loading succeeded
-                                               RocksDB.loadLibrary();
-
-                                               // seems to have worked
-                                               LOG.info("Successfully loaded 
RocksDB native library");
-                                               rocksDbInitialized = true;
-                                               return;
-                                       }
-                                       catch (Throwable t) {
-                                               lastException = t;
-                                               LOG.debug("RocksDB JNI library 
loading attempt {} failed", attempt, t);
-
-                                               // try to force RocksDB to 
attempt reloading the library
-                                               try {
-                                                       
resetRocksDBLoadedFlag();
-                                               } catch (Throwable tt) {
-                                                       LOG.debug("Failed to 
reset 'initialized' flag in RocksDB native code loader", tt);
-                                               }
-                                       }
-                               }
-
-                               throw new IOException("Could not load the 
native RocksDB library", lastException);
-                       }
-               }
-       }
-
-       @VisibleForTesting
-       static void resetRocksDBLoadedFlag() throws Exception {
-               final Field initField = 
org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized");
-               initField.setAccessible(true);
-               initField.setBoolean(null, false);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
deleted file mode 100644
index 94e15fa..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.runtime.state.StateBackendFactory;
-
-import java.io.IOException;
-
-/**
- * A factory that creates an {@link 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend}
- * from a configuration.
- */
-public class RocksDBStateBackendFactory implements 
StateBackendFactory<RocksDBStateBackend> {
-
-       @Override
-       public RocksDBStateBackend createFromConfig(Configuration config)
-                       throws IllegalConfigurationException, IOException {
-
-               // we need to explicitly read the checkpoint directory here, 
because that
-               // is a required constructor parameter
-               final String checkpointDirURI = 
config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
-               if (checkpointDirURI == null) {
-                       throw new IllegalConfigurationException(
-                               "Cannot create the RocksDB state backend: The 
configuration does not specify the " +
-                               "checkpoint directory '" + 
CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
-               }
-
-               return new 
RocksDBStateBackend(checkpointDirURI).configure(config);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
deleted file mode 100644
index da21e8a..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.internal.InternalValueState;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-/**
- * {@link ValueState} implementation that stores state in RocksDB.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of value that the state state stores.
- */
-public class RocksDBValueState<K, N, V>
-       extends AbstractRocksDBState<K, N, ValueState<V>, 
ValueStateDescriptor<V>, V>
-       implements InternalValueState<N, V> {
-
-       /** Serializer for the values. */
-       private final TypeSerializer<V> valueSerializer;
-
-       /**
-        * We disable writes to the write-ahead-log here. We can't have these 
in the base class
-        * because JNI segfaults for some reason if they are.
-        */
-       private final WriteOptions writeOptions;
-
-       /**
-        * Creates a new {@code RocksDBValueState}.
-        *
-        * @param namespaceSerializer The serializer for the namespace.
-        * @param stateDesc The state identifier for the state. This contains 
name
-        *                           and can create a default state value.
-        */
-       public RocksDBValueState(ColumnFamilyHandle columnFamily,
-                       TypeSerializer<N> namespaceSerializer,
-                       ValueStateDescriptor<V> stateDesc,
-                       RocksDBKeyedStateBackend<K> backend) {
-
-               super(columnFamily, namespaceSerializer, stateDesc, backend);
-               this.valueSerializer = stateDesc.getSerializer();
-
-               writeOptions = new WriteOptions();
-               writeOptions.setDisableWAL(true);
-       }
-
-       @Override
-       public V value() {
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       byte[] valueBytes = backend.db.get(columnFamily, key);
-                       if (valueBytes == null) {
-                               return stateDesc.getDefaultValue();
-                       }
-                       return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
-               } catch (IOException | RocksDBException e) {
-                       throw new RuntimeException("Error while retrieving data 
from RocksDB.", e);
-               }
-       }
-
-       @Override
-       public void update(V value) throws IOException {
-               if (value == null) {
-                       clear();
-                       return;
-               }
-               DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
-               try {
-                       writeCurrentKeyWithGroupAndNamespace();
-                       byte[] key = keySerializationStream.toByteArray();
-                       keySerializationStream.reset();
-                       valueSerializer.serialize(value, out);
-                       backend.db.put(columnFamily, writeOptions, key, 
keySerializationStream.toByteArray());
-               } catch (Exception e) {
-                       throw new RuntimeException("Error while adding data to 
RocksDB", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
deleted file mode 100644
index bae1f81..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import 
org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.TestTaskStateManager;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.runtime.state.testutils.BackendForTestStream;
-import 
org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFactory;
-import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory;
-import org.apache.flink.runtime.taskmanager.CheckpointResponder;
-import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.FutureUtil;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RunnableFuture;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-/**
- * Tests for asynchronous RocksDB Key/Value state checkpoints.
- */
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", 
"org.apache.log4j.*"})
-@SuppressWarnings("serial")
-public class RocksDBAsyncSnapshotTest extends TestLogger {
-
-       /**
-        * Temporary fold for test.
-        */
-       @Rule
-       public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-       /**
-        * This ensures that asynchronous state handles are actually 
materialized asynchronously.
-        *
-        * <p>We use latches to block at various stages and see if the code 
still continues through
-        * the parts that are not asynchronous. If the checkpoint is not done 
asynchronously the
-        * test will simply lock forever.
-        */
-       @Test
-       public void testFullyAsyncSnapshot() throws Exception {
-
-               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
-                               OneInputStreamTask::new,
-                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
-               testHarness.setupOutputForSingletonOperatorChain();
-
-               testHarness.configureForKeyedStream(new KeySelector<String, 
String>() {
-                       @Override
-                       public String getKey(String value) throws Exception {
-                               return value;
-                       }
-               }, BasicTypeInfo.STRING_TYPE_INFO);
-
-               StreamConfig streamConfig = testHarness.getStreamConfig();
-
-               File dbDir = temporaryFolder.newFolder();
-
-               RocksDBStateBackend backend = new RocksDBStateBackend(new 
MemoryStateBackend());
-               backend.setDbStoragePath(dbDir.getAbsolutePath());
-
-               streamConfig.setStateBackend(backend);
-
-               streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-               streamConfig.setOperatorID(new OperatorID());
-
-               final OneShotLatch delayCheckpointLatch = new OneShotLatch();
-               final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-
-               CheckpointResponder checkpointResponderMock = new 
CheckpointResponder() {
-
-                       @Override
-                       public void acknowledgeCheckpoint(
-                               JobID jobID,
-                               ExecutionAttemptID executionAttemptID,
-                               long checkpointId,
-                               CheckpointMetrics checkpointMetrics,
-                               TaskStateSnapshot subtaskState) {
-                               // block on the latch, to verify that 
triggerCheckpoint returns below,
-                               // even though the async checkpoint would not 
finish
-                               try {
-                                       delayCheckpointLatch.await();
-                               } catch (InterruptedException e) {
-                                       throw new RuntimeException(e);
-                               }
-
-                               boolean hasManagedKeyedState = false;
-                               for (Map.Entry<OperatorID, 
OperatorSubtaskState> entry : subtaskState.getSubtaskStateMappings()) {
-                                       OperatorSubtaskState state = 
entry.getValue();
-                                       if (state != null) {
-                                               hasManagedKeyedState |= 
state.getManagedKeyedState() != null;
-                                       }
-                               }
-
-                               // should be one k/v state
-                               assertTrue(hasManagedKeyedState);
-
-                               // we now know that the checkpoint went through
-                               ensureCheckpointLatch.trigger();
-                       }
-
-                       @Override
-                       public void declineCheckpoint(
-                               JobID jobID, ExecutionAttemptID 
executionAttemptID,
-                               long checkpointId, Throwable cause) {
-
-                       }
-               };
-
-               JobID jobID = new JobID();
-               ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(0L, 0L);
-               TestTaskStateManager taskStateManagerTestMock = new 
TestTaskStateManager(
-                       jobID,
-                       executionAttemptID,
-                       checkpointResponderMock);
-
-               StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-                       testHarness.jobConfig,
-                       testHarness.taskConfig,
-                       testHarness.memorySize,
-                       new MockInputSplitProvider(),
-                       testHarness.bufferSize,
-                       taskStateManagerTestMock);
-
-               testHarness.invoke(mockEnv);
-
-               final OneInputStreamTask<String, String> task = 
testHarness.getTask();
-
-               // wait for the task to be running
-               for (Field field: StreamTask.class.getDeclaredFields()) {
-                       if (field.getName().equals("isRunning")) {
-                               field.setAccessible(true);
-                               while (!field.getBoolean(task)) {
-                                       Thread.sleep(10);
-                               }
-                       }
-               }
-
-               task.triggerCheckpoint(new CheckpointMetaData(42, 17), 
CheckpointOptions.forCheckpointWithDefaultLocation());
-
-               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-
-               // now we allow the checkpoint
-               delayCheckpointLatch.trigger();
-
-               // wait for the checkpoint to go through
-               ensureCheckpointLatch.await();
-
-               testHarness.endInput();
-
-               ExecutorService threadPool = 
task.getAsyncOperationsThreadPool();
-               threadPool.shutdown();
-               Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
-
-               testHarness.waitForTaskCompletion();
-               if (mockEnv.wasFailedExternally()) {
-                       fail("Unexpected exception during execution.");
-               }
-       }
-
-       /**
-        * This tests ensures that canceling of asynchronous snapshots works as 
expected and does not block.
-        */
-       @Test
-       public void testCancelFullyAsyncCheckpoints() throws Exception {
-               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
-                               OneInputStreamTask::new,
-                               BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               testHarness.setupOutputForSingletonOperatorChain();
-
-               testHarness.configureForKeyedStream(value -> value, 
BasicTypeInfo.STRING_TYPE_INFO);
-
-               StreamConfig streamConfig = testHarness.getStreamConfig();
-
-               File dbDir = temporaryFolder.newFolder();
-
-               // this is the proper instance that we need to call.
-               BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
-                       new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {
-
-                       int count = 1;
-
-                       @Override
-                       public CheckpointStateOutputStream 
createCheckpointStateOutputStream(CheckpointedStateScope scope) throws 
Exception {
-                               // we skip the first created stream, because it 
is used to checkpoint the timer service, which is
-                               // currently not asynchronous.
-                               if (count > 0) {
-                                       --count;
-                                       return new 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize);
-                               } else {
-                                       return 
super.createCheckpointStateOutputStream(scope);
-                               }
-                       }
-               };
-
-               // to avoid serialization of the above factory instance, we 
need to pass it in
-               // through a static variable
-
-               StateBackend stateBackend = new BackendForTestStream(new 
StaticForwardFactory(blockerCheckpointStreamFactory));
-
-               RocksDBStateBackend backend = new 
RocksDBStateBackend(stateBackend);
-               backend.setDbStoragePath(dbDir.getAbsolutePath());
-
-               streamConfig.setStateBackend(backend);
-
-               streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-               streamConfig.setOperatorID(new OperatorID());
-
-               TestTaskStateManager taskStateManagerTestMock = new 
TestTaskStateManager();
-
-               StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-                               testHarness.jobConfig,
-                               testHarness.taskConfig,
-                               testHarness.memorySize,
-                               new MockInputSplitProvider(),
-                               testHarness.bufferSize,
-                               taskStateManagerTestMock);
-
-               blockerCheckpointStreamFactory.setBlockerLatch(new 
OneShotLatch());
-               blockerCheckpointStreamFactory.setWaiterLatch(new 
OneShotLatch());
-
-               testHarness.invoke(mockEnv);
-
-               final OneInputStreamTask<String, String> task = 
testHarness.getTask();
-
-               // wait for the task to be running
-               for (Field field: StreamTask.class.getDeclaredFields()) {
-                       if (field.getName().equals("isRunning")) {
-                               field.setAccessible(true);
-                               while (!field.getBoolean(task)) {
-                                       Thread.sleep(10);
-                               }
-                       }
-               }
-
-               task.triggerCheckpoint(
-                       new CheckpointMetaData(42, 17),
-                       CheckpointOptions.forCheckpointWithDefaultLocation());
-
-               testHarness.processElement(new StreamRecord<>("Wohoo", 0));
-               blockerCheckpointStreamFactory.getWaiterLatch().await();
-               task.cancel();
-               blockerCheckpointStreamFactory.getBlockerLatch().trigger();
-               testHarness.endInput();
-               
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());
-
-               try {
-                       ExecutorService threadPool = 
task.getAsyncOperationsThreadPool();
-                       threadPool.shutdown();
-                       Assert.assertTrue(threadPool.awaitTermination(60_000, 
TimeUnit.MILLISECONDS));
-                       testHarness.waitForTaskCompletion();
-
-                       fail("Operation completed. Cancel failed.");
-               } catch (Exception expected) {
-
-                       Throwable cause = expected.getCause();
-
-                       if (!(cause instanceof CancelTaskException)) {
-                               fail("Unexpected exception: " + expected);
-                       }
-               }
-       }
-
-       /**
-        * Test that the snapshot files are cleaned up in case of a failure 
during the snapshot
-        * procedure.
-        */
-       @Test
-       public void testCleanupOfSnapshotsInFailureCase() throws Exception {
-               long checkpointId = 1L;
-               long timestamp = 42L;
-
-               Environment env = new DummyEnvironment("test task", 1, 0);
-
-               final IOException testException = new IOException("Test 
exception");
-               CheckpointStateOutputStream outputStream = spy(new 
FailingStream(testException));
-
-               RocksDBStateBackend backend = new 
RocksDBStateBackend((StateBackend) new MemoryStateBackend());
-
-               
backend.setDbStoragePath(temporaryFolder.newFolder().toURI().toString());
-
-               AbstractKeyedStateBackend<Void> keyedStateBackend = 
backend.createKeyedStateBackend(
-                       env,
-                       new JobID(),
-                       "test operator",
-                       VoidSerializer.INSTANCE,
-                       1,
-                       new KeyGroupRange(0, 0),
-                       null);
-
-               try {
-
-                       keyedStateBackend.restore(null);
-
-                       // register a state so that the state backend has to 
checkpoint something
-                       keyedStateBackend.getPartitionedState(
-                               "namespace",
-                               StringSerializer.INSTANCE,
-                               new ValueStateDescriptor<>("foobar", 
String.class));
-
-                       RunnableFuture<KeyedStateHandle> snapshotFuture = 
keyedStateBackend.snapshot(
-                               checkpointId, timestamp,
-                               new TestCheckpointStreamFactory(() -> 
outputStream),
-                               
CheckpointOptions.forCheckpointWithDefaultLocation());
-
-                       try {
-                               FutureUtil.runIfNotDoneAndGet(snapshotFuture);
-                               fail("Expected an exception to be thrown 
here.");
-                       } catch (ExecutionException e) {
-                               Assert.assertEquals(testException, 
e.getCause());
-                       }
-
-                       verify(outputStream).close();
-               } finally {
-                       IOUtils.closeQuietly(keyedStateBackend);
-                       keyedStateBackend.dispose();
-               }
-       }
-
-       @Test
-       public void testConsistentSnapshotSerializationFlagsAndMasks() {
-
-               Assert.assertEquals(0xFFFF, 
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK);
-               Assert.assertEquals(0x80, 
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-
-               byte[] expectedKey = new byte[] {42, 42};
-               byte[] modKey = expectedKey.clone();
-
-               Assert.assertFalse(
-                       
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
-
-               
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey);
-               
Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
-
-               
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey);
-               Assert.assertFalse(
-                       
RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey));
-
-               Assert.assertTrue(Arrays.equals(expectedKey, modKey));
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static class AsyncCheckpointOperator
-               extends AbstractStreamOperator<String>
-               implements OneInputStreamOperator<String, String> {
-
-               @Override
-               public void open() throws Exception {
-                       super.open();
-
-                       // also get the state in open, this way we are sure 
that it was created before
-                       // we trigger the test checkpoint
-                       ValueState<String> state = getPartitionedState(
-                                       VoidNamespace.INSTANCE,
-                                       VoidNamespaceSerializer.INSTANCE,
-                                       new ValueStateDescriptor<>("count", 
StringSerializer.INSTANCE));
-
-               }
-
-               @Override
-               public void processElement(StreamRecord<String> element) throws 
Exception {
-                       // we also don't care
-
-                       ValueState<String> state = getPartitionedState(
-                                       VoidNamespace.INSTANCE,
-                                       VoidNamespaceSerializer.INSTANCE,
-                                       new ValueStateDescriptor<>("count", 
StringSerializer.INSTANCE));
-
-                       state.update(element.getValue());
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // failing stream
-       // 
------------------------------------------------------------------------
-
-       private static class StaticForwardFactory implements StreamFactory {
-
-               static CheckpointStreamFactory factory;
-
-               StaticForwardFactory(CheckpointStreamFactory factory) {
-                       StaticForwardFactory.factory = factory;
-               }
-
-               @Override
-               public CheckpointStateOutputStream get() throws Exception {
-                       return 
factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-               }
-       }
-
-       private static class FailingStream extends CheckpointStateOutputStream {
-
-               private final IOException testException;
-
-               FailingStream(IOException testException) {
-                       this.testException = testException;
-               }
-
-               @Override
-               public StreamStateHandle closeAndGetHandle() throws IOException 
{
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public long getPos() throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-
-               @Override
-               public void write(int b) throws IOException {
-                       throw testException;
-               }
-
-               @Override
-               public void flush() throws IOException {
-                       throw testException;
-               }
-
-               @Override
-               public void sync() throws IOException {
-                       throw testException;
-               }
-
-               @Override
-               public void close() throws IOException {
-                       throw new UnsupportedOperationException();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
deleted file mode 100644
index 565f27d..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.junit.Test;
-
-/**
- * This test checks that the RocksDB native code loader still responds to 
resetting the init flag.
- */
-public class RocksDBInitResetTest {
-
-       @Test
-       public void testResetInitFlag() throws Exception {
-               RocksDBStateBackend.resetRocksDBLoadedFlag();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
deleted file mode 100644
index 1d14f6e..0000000
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksIterator;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-
-/**
- * Tests for the RocksDBMergeIterator.
- */
-public class RocksDBMergeIteratorTest {
-
-       private static final int NUM_KEY_VAL_STATES = 50;
-       private static final int MAX_NUM_KEYS = 20;
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Test
-       public void testEmptyMergeIterator() throws IOException {
-               RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
-                               new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2);
-               Assert.assertFalse(emptyIterator.isValid());
-       }
-
-       @Test
-       public void testMergeIteratorByte() throws Exception {
-               Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
-
-               testMergeIterator(Byte.MAX_VALUE);
-       }
-
-       @Test
-       public void testMergeIteratorShort() throws Exception {
-               Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
-
-               testMergeIterator(Short.MAX_VALUE);
-       }
-
-       public void testMergeIterator(int maxParallelism) throws Exception {
-               Random random = new Random(1234);
-
-               RocksDB rocksDB = 
RocksDB.open(tempFolder.getRoot().getAbsolutePath());
-               try {
-                       List<Tuple2<RocksIterator, Integer>> 
rocksIteratorsWithKVStateId = new ArrayList<>();
-                       List<Tuple2<ColumnFamilyHandle, Integer>> 
columnFamilyHandlesWithKeyCount = new ArrayList<>();
-
-                       int totalKeysExpected = 0;
-
-                       for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
-                               ColumnFamilyHandle handle = 
rocksDB.createColumnFamily(
-                                               new 
ColumnFamilyDescriptor(("column-" + 
c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
-
-                               ByteArrayOutputStreamWithPos bos = new 
ByteArrayOutputStreamWithPos();
-                               DataOutputStream dos = new 
DataOutputStream(bos);
-
-                               int numKeys = random.nextInt(MAX_NUM_KEYS + 1);
-
-                               for (int i = 0; i < numKeys; ++i) {
-                                       if (maxParallelism <= Byte.MAX_VALUE) {
-                                               dos.writeByte(i);
-                                       } else {
-                                               dos.writeShort(i);
-                                       }
-                                       dos.writeInt(i);
-                                       byte[] key = bos.toByteArray();
-                                       byte[] val = new byte[]{42};
-                                       rocksDB.put(handle, key, val);
-
-                                       bos.reset();
-                               }
-                               columnFamilyHandlesWithKeyCount.add(new 
Tuple2<>(handle, numKeys));
-                               totalKeysExpected += numKeys;
-                       }
-
-                       int id = 0;
-                       for (Tuple2<ColumnFamilyHandle, Integer> 
columnFamilyHandle : columnFamilyHandlesWithKeyCount) {
-                               rocksIteratorsWithKVStateId.add(new 
Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id));
-                               ++id;
-                       }
-
-                       RocksDBKeyedStateBackend.RocksDBMergeIterator 
mergeIterator = new 
RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId, 
maxParallelism <= Byte.MAX_VALUE ? 1 : 2);
-
-                       int prevKVState = -1;
-                       int prevKey = -1;
-                       int prevKeyGroup = -1;
-                       int totalKeysActual = 0;
-
-                       while (mergeIterator.isValid()) {
-                               ByteBuffer bb = 
ByteBuffer.wrap(mergeIterator.key());
-
-                               int keyGroup = maxParallelism > Byte.MAX_VALUE 
? bb.getShort() : bb.get();
-                               int key = bb.getInt();
-
-                               Assert.assertTrue(keyGroup >= prevKeyGroup);
-                               Assert.assertTrue(key >= prevKey);
-                               Assert.assertEquals(prevKeyGroup != keyGroup, 
mergeIterator.isNewKeyGroup());
-                               Assert.assertEquals(prevKVState != 
mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState());
-
-                               prevKeyGroup = keyGroup;
-                               prevKVState = mergeIterator.kvStateId();
-
-                               //System.out.println(keyGroup + " " + key + " " 
+ mergeIterator.kvStateId());
-                               mergeIterator.next();
-                               ++totalKeysActual;
-                       }
-
-                       Assert.assertEquals(totalKeysExpected, totalKeysActual);
-
-                       for (Tuple2<ColumnFamilyHandle, Integer> 
handleWithCount : columnFamilyHandlesWithKeyCount) {
-                               rocksDB.dropColumnFamily(handleWithCount.f0);
-                       }
-               } finally {
-                       rocksDB.close();
-               }
-       }
-
-}

Reply via email to