http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java deleted file mode 100644 index e8c34cc..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ /dev/null @@ -1,532 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.internal.InternalMapState; -import org.apache.flink.util.Preconditions; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.WriteOptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.Map; - -/** - * {@link MapState} implementation that stores state in RocksDB. - * - * <p>{@link RocksDBStateBackend} must ensure that we set the - * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since - * we use the {@code merge()} call. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <UK> The type of the keys in the map state. - * @param <UV> The type of the values in the map state. - */ -public class RocksDBMapState<K, N, UK, UV> - extends AbstractRocksDBState<K, N, MapState<UK, UV>, MapStateDescriptor<UK, UV>, Map<UK, UV>> - implements InternalMapState<N, UK, UV> { - - private static final Logger LOG = LoggerFactory.getLogger(RocksDBMapState.class); - - /** Serializer for the keys and values. */ - private final TypeSerializer<UK> userKeySerializer; - private final TypeSerializer<UV> userValueSerializer; - - /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** - * Creates a new {@code RocksDBMapState}. - * - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. - */ - public RocksDBMapState(ColumnFamilyHandle columnFamily, - TypeSerializer<N> namespaceSerializer, - MapStateDescriptor<UK, UV> stateDesc, - RocksDBKeyedStateBackend<K> backend) { - - super(columnFamily, namespaceSerializer, stateDesc, backend); - - this.userKeySerializer = stateDesc.getKeySerializer(); - this.userValueSerializer = stateDesc.getValueSerializer(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - // ------------------------------------------------------------------------ - // MapState Implementation - // ------------------------------------------------------------------------ - - @Override - public UV get(UK userKey) throws IOException, RocksDBException { - byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); - byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); - - return (rawValueBytes == null ? null : deserializeUserValue(rawValueBytes)); - } - - @Override - public void put(UK userKey, UV userValue) throws IOException, RocksDBException { - - byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); - byte[] rawValueBytes = serializeUserValue(userValue); - - backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); - } - - @Override - public void putAll(Map<UK, UV> map) throws IOException, RocksDBException { - if (map == null) { - return; - } - - for (Map.Entry<UK, UV> entry : map.entrySet()) { - put(entry.getKey(), entry.getValue()); - } - } - - @Override - public void remove(UK userKey) throws IOException, RocksDBException { - byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); - - backend.db.remove(columnFamily, writeOptions, rawKeyBytes); - } - - @Override - public boolean contains(UK userKey) throws IOException, RocksDBException { - byte[] rawKeyBytes = serializeUserKeyWithCurrentKeyAndNamespace(userKey); - byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes); - - return (rawValueBytes != null); - } - - @Override - public Iterable<Map.Entry<UK, UV>> entries() throws IOException, RocksDBException { - final Iterator<Map.Entry<UK, UV>> iterator = iterator(); - - // Return null to make the behavior consistent with other states. - if (!iterator.hasNext()) { - return null; - } else { - return new Iterable<Map.Entry<UK, UV>>() { - @Override - public Iterator<Map.Entry<UK, UV>> iterator() { - return iterator; - } - }; - } - } - - @Override - public Iterable<UK> keys() throws IOException, RocksDBException { - final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - - return new Iterable<UK>() { - @Override - public Iterator<UK> iterator() { - return new RocksDBMapIterator<UK>(backend.db, prefixBytes) { - @Override - public UK next() { - RocksDBMapEntry entry = nextEntry(); - return (entry == null ? null : entry.getKey()); - } - }; - } - }; - } - - @Override - public Iterable<UV> values() throws IOException, RocksDBException { - final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - - return new Iterable<UV>() { - @Override - public Iterator<UV> iterator() { - return new RocksDBMapIterator<UV>(backend.db, prefixBytes) { - @Override - public UV next() { - RocksDBMapEntry entry = nextEntry(); - return (entry == null ? null : entry.getValue()); - } - }; - } - }; - } - - @Override - public Iterator<Map.Entry<UK, UV>> iterator() throws IOException, RocksDBException { - final byte[] prefixBytes = serializeCurrentKeyAndNamespace(); - - return new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, prefixBytes) { - @Override - public Map.Entry<UK, UV> next() { - return nextEntry(); - } - }; - } - - @Override - public void clear() { - try { - Iterator<Map.Entry<UK, UV>> iterator = iterator(); - - while (iterator.hasNext()) { - iterator.next(); - iterator.remove(); - } - } catch (Exception e) { - LOG.warn("Error while cleaning the state.", e); - } - } - - @Override - @SuppressWarnings("unchecked") - public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { - Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace"); - - //TODO make KvStateSerializer key-group aware to save this round trip and key-group computation - Tuple2<K, N> des = KvStateSerializer.deserializeKeyAndNamespace( - serializedKeyAndNamespace, - backend.getKeySerializer(), - namespaceSerializer); - - int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups()); - - ByteArrayOutputStreamWithPos outputStream = new ByteArrayOutputStreamWithPos(128); - DataOutputViewStreamWrapper outputView = new DataOutputViewStreamWrapper(outputStream); - writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1, outputStream, outputView); - final byte[] keyPrefixBytes = outputStream.toByteArray(); - - final Iterator<Map.Entry<UK, UV>> iterator = new RocksDBMapIterator<Map.Entry<UK, UV>>(backend.db, keyPrefixBytes) { - @Override - public Map.Entry<UK, UV> next() { - return nextEntry(); - } - }; - - // Return null to make the behavior consistent with other backends - if (!iterator.hasNext()) { - return null; - } - - return KvStateSerializer.serializeMap(new Iterable<Map.Entry<UK, UV>>() { - @Override - public Iterator<Map.Entry<UK, UV>> iterator() { - return iterator; - } - }, userKeySerializer, userValueSerializer); - } - - // ------------------------------------------------------------------------ - // Serialization Methods - // ------------------------------------------------------------------------ - - private byte[] serializeCurrentKeyAndNamespace() throws IOException { - writeCurrentKeyWithGroupAndNamespace(); - - return keySerializationStream.toByteArray(); - } - - private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException { - writeCurrentKeyWithGroupAndNamespace(); - userKeySerializer.serialize(userKey, keySerializationDataOutputView); - - return keySerializationStream.toByteArray(); - } - - private byte[] serializeUserValue(UV userValue) throws IOException { - keySerializationStream.reset(); - - if (userValue == null) { - keySerializationDataOutputView.writeBoolean(true); - } else { - keySerializationDataOutputView.writeBoolean(false); - userValueSerializer.serialize(userValue, keySerializationDataOutputView); - } - - return keySerializationStream.toByteArray(); - } - - private UK deserializeUserKey(byte[] rawKeyBytes) throws IOException { - ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes); - DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); - - readKeyWithGroupAndNamespace(bais, in); - - return userKeySerializer.deserialize(in); - } - - private UV deserializeUserValue(byte[] rawValueBytes) throws IOException { - ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawValueBytes); - DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); - - boolean isNull = in.readBoolean(); - - return isNull ? null : userValueSerializer.deserialize(in); - } - - // ------------------------------------------------------------------------ - // Internal Classes - // ------------------------------------------------------------------------ - - /** A map entry in RocksDBMapState. */ - private class RocksDBMapEntry implements Map.Entry<UK, UV> { - private final RocksDB db; - - /** The raw bytes of the key stored in RocksDB. Each user key is stored in RocksDB - * with the format #KeyGroup#Key#Namespace#UserKey. */ - private final byte[] rawKeyBytes; - - /** The raw bytes of the value stored in RocksDB. */ - private byte[] rawValueBytes; - - /** True if the entry has been deleted. */ - private boolean deleted; - - /** The user key and value. The deserialization is performed lazily, i.e. the key - * and the value is deserialized only when they are accessed. */ - private UK userKey = null; - private UV userValue = null; - - RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, final byte[] rawValueBytes) { - this.db = db; - - this.rawKeyBytes = rawKeyBytes; - this.rawValueBytes = rawValueBytes; - this.deleted = false; - } - - public void remove() { - deleted = true; - rawValueBytes = null; - - try { - db.remove(columnFamily, writeOptions, rawKeyBytes); - } catch (RocksDBException e) { - throw new RuntimeException("Error while removing data from RocksDB.", e); - } - } - - @Override - public UK getKey() { - if (userKey == null) { - try { - userKey = deserializeUserKey(rawKeyBytes); - } catch (IOException e) { - throw new RuntimeException("Error while deserializing the user key."); - } - } - - return userKey; - } - - @Override - public UV getValue() { - if (deleted) { - return null; - } else { - if (userValue == null) { - try { - userValue = deserializeUserValue(rawValueBytes); - } catch (IOException e) { - throw new RuntimeException("Error while deserializing the user value."); - } - } - - return userValue; - } - } - - @Override - public UV setValue(UV value) { - if (deleted) { - throw new IllegalStateException("The value has already been deleted."); - } - - UV oldValue = getValue(); - - try { - userValue = value; - rawValueBytes = serializeUserValue(value); - - db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes); - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while putting data into RocksDB.", e); - } - - return oldValue; - } - } - - /** An auxiliary utility to scan all entries under the given key. */ - private abstract class RocksDBMapIterator<T> implements Iterator<T> { - - static final int CACHE_SIZE_BASE = 1; - static final int CACHE_SIZE_LIMIT = 128; - - /** The db where data resides. */ - private final RocksDB db; - - /** - * The prefix bytes of the key being accessed. All entries under the same key - * has the same prefix, hence we can stop the iterating once coming across an - * entry with a different prefix. - */ - private final byte[] keyPrefixBytes; - - /** - * True if all entries have been accessed or the iterator has come across an - * entry with a different prefix. - */ - private boolean expired = false; - - /** A in-memory cache for the entries in the rocksdb. */ - private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>(); - private int cacheIndex = 0; - - RocksDBMapIterator(final RocksDB db, final byte[] keyPrefixBytes) { - this.db = db; - this.keyPrefixBytes = keyPrefixBytes; - } - - @Override - public boolean hasNext() { - loadCache(); - - return (cacheIndex < cacheEntries.size()); - } - - @Override - public void remove() { - if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) { - throw new IllegalStateException("The remove operation must be called after an valid next operation."); - } - - RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1); - lastEntry.remove(); - } - - final RocksDBMapEntry nextEntry() { - loadCache(); - - if (cacheIndex == cacheEntries.size()) { - if (!expired) { - throw new IllegalStateException(); - } - - return null; - } - - RocksDBMapEntry entry = cacheEntries.get(cacheIndex); - cacheIndex++; - - return entry; - } - - private void loadCache() { - if (cacheIndex > cacheEntries.size()) { - throw new IllegalStateException(); - } - - // Load cache entries only when the cache is empty and there still exist unread entries - if (cacheIndex < cacheEntries.size() || expired) { - return; - } - - RocksIterator iterator = db.newIterator(columnFamily); - - /* - * The iteration starts from the prefix bytes at the first loading. The cache then is - * reloaded when the next entry to return is the last one in the cache. At that time, - * we will start the iterating from the last returned entry. - */ - RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); - byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); - int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT)); - - cacheEntries.clear(); - cacheIndex = 0; - - iterator.seek(startBytes); - - /* - * If the last returned entry is not deleted, it will be the first entry in the - * iterating. Skip it to avoid redundant access in such cases. - */ - if (lastEntry != null && !lastEntry.deleted) { - iterator.next(); - } - - while (true) { - if (!iterator.isValid() || !underSameKey(iterator.key())) { - expired = true; - break; - } - - if (cacheEntries.size() >= numEntries) { - break; - } - - RocksDBMapEntry entry = new RocksDBMapEntry(db, iterator.key(), iterator.value()); - cacheEntries.add(entry); - - iterator.next(); - } - - iterator.close(); - } - - private boolean underSameKey(byte[] rawKeyBytes) { - if (rawKeyBytes.length < keyPrefixBytes.length) { - return false; - } - - for (int i = 0; i < keyPrefixBytes.length; ++i) { - if (rawKeyBytes[i] != keyPrefixBytes[i]) { - return false; - } - } - - return true; - } - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java deleted file mode 100644 index b4c3f51..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.internal.InternalReducingState; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.Collection; - -/** - * {@link ReducingState} implementation that stores state in RocksDB. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of value that the state state stores. - */ -public class RocksDBReducingState<K, N, V> - extends AbstractRocksDBState<K, N, ReducingState<V>, ReducingStateDescriptor<V>, V> - implements InternalReducingState<N, V> { - - /** Serializer for the values. */ - private final TypeSerializer<V> valueSerializer; - - /** User-specified reduce function. */ - private final ReduceFunction<V> reduceFunction; - - /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** - * Creates a new {@code RocksDBReducingState}. - * - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - */ - public RocksDBReducingState(ColumnFamilyHandle columnFamily, - TypeSerializer<N> namespaceSerializer, - ReducingStateDescriptor<V> stateDesc, - RocksDBKeyedStateBackend<K> backend) { - - super(columnFamily, namespaceSerializer, stateDesc, backend); - this.valueSerializer = stateDesc.getSerializer(); - this.reduceFunction = stateDesc.getReduceFunction(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - @Override - public V get() { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - if (valueBytes == null) { - return null; - } - return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while retrieving data from RocksDB", e); - } - } - - @Override - public void add(V value) throws IOException { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - if (valueBytes == null) { - keySerializationStream.reset(); - valueSerializer.serialize(value, out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } else { - V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); - V newValue = reduceFunction.reduce(oldValue, value); - keySerializationStream.reset(); - valueSerializer.serialize(newValue, out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } - } catch (Exception e) { - throw new RuntimeException("Error while adding data to RocksDB", e); - } - } - - @Override - public void mergeNamespaces(N target, Collection<N> sources) throws Exception { - if (sources == null || sources.isEmpty()) { - return; - } - - // cache key and namespace - final K key = backend.getCurrentKey(); - final int keyGroup = backend.getCurrentKeyGroupIndex(); - - try { - V current = null; - - // merge the sources to the target - for (N source : sources) { - if (source != null) { - - writeKeyWithGroupAndNamespace( - keyGroup, key, source, - keySerializationStream, keySerializationDataOutputView); - - final byte[] sourceKey = keySerializationStream.toByteArray(); - final byte[] valueBytes = backend.db.get(columnFamily, sourceKey); - backend.db.delete(columnFamily, sourceKey); - - if (valueBytes != null) { - V value = valueSerializer.deserialize( - new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes))); - - if (current != null) { - current = reduceFunction.reduce(current, value); - } - else { - current = value; - } - } - } - } - - // if something came out of merging the sources, merge it or write it to the target - if (current != null) { - // create the target full-binary-key - writeKeyWithGroupAndNamespace( - keyGroup, key, target, - keySerializationStream, keySerializationDataOutputView); - - final byte[] targetKey = keySerializationStream.toByteArray(); - final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey); - - if (targetValueBytes != null) { - // target also had a value, merge - V value = valueSerializer.deserialize( - new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(targetValueBytes))); - - current = reduceFunction.reduce(current, value); - } - - // serialize the resulting value - keySerializationStream.reset(); - valueSerializer.serialize(current, keySerializationDataOutputView); - - // write the resulting value - backend.db.put(columnFamily, writeOptions, targetKey, keySerializationStream.toByteArray()); - } - } - catch (Exception e) { - throw new Exception("Error while merging state in RocksDB", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java deleted file mode 100644 index b7e4794..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ /dev/null @@ -1,691 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.CheckpointStorage; -import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; -import org.apache.flink.runtime.state.ConfigurableStateBackend; -import org.apache.flink.runtime.state.DefaultOperatorStateBackend; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.util.AbstractID; - -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; -import org.rocksdb.NativeLibraryLoader; -import org.rocksdb.RocksDB; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Random; -import java.util.UUID; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A State Backend that stores its state in {@code RocksDB}. This state backend can - * store very large state that exceeds memory and spills to disk. - * - * <p>All key/value state (including windows) is stored in the key/value index of RocksDB. - * For persistence against loss of machines, checkpoints take a snapshot of the - * RocksDB database, and persist that snapshot in a file system (by default) or - * another configurable state backend. - * - * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options - * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and - * {@link #setOptions(OptionsFactory)}. - */ -public class RocksDBStateBackend extends AbstractStateBackend implements ConfigurableStateBackend { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackend.class); - - /** The number of (re)tries for loading the RocksDB JNI library. */ - private static final int ROCKSDB_LIB_LOADING_ATTEMPTS = 3; - - private static boolean rocksDbInitialized = false; - - // ------------------------------------------------------------------------ - - // -- configuration values, set in the application / configuration - - /** The state backend that we use for creating checkpoint streams. */ - private final StateBackend checkpointStreamBackend; - - /** Base paths for RocksDB directory, as configured. - * Null if not yet set, in which case the configuration values will be used. - * The configuration defaults to the TaskManager's temp directories. */ - @Nullable - private Path[] localRocksDbDirectories; - - /** The pre-configured option settings. */ - private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT; - - /** The options factory to create the RocksDB options in the cluster. */ - @Nullable - private OptionsFactory optionsFactory; - - /** True if incremental checkpointing is enabled. - * Null if not yet set, in which case the configuration values will be used. */ - @Nullable - private Boolean enableIncrementalCheckpointing; - - // -- runtime values, set on TaskManager when initializing / using the backend - - /** Base paths for RocksDB directory, as initialized. */ - private transient File[] initializedDbBasePaths; - - /** JobID for uniquifying backup paths. */ - private transient JobID jobId; - - /** The index of the next directory to be used from {@link #initializedDbBasePaths}.*/ - private transient int nextDirectory; - - /** Whether we already lazily initialized our local storage directories. */ - private transient boolean isInitialized; - - // ------------------------------------------------------------------------ - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the - * file system and location defined by the given URI. - * - * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system - * host and port in the URI, or have the Hadoop configuration that describes the file system - * (host / high-availability group / possibly credentials) either referenced from the Flink - * config, or included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(String checkpointDataUri) throws IOException { - this(new Path(checkpointDataUri).toUri()); - } - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the - * file system and location defined by the given URI. - * - * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system - * host and port in the URI, or have the Hadoop configuration that describes the file system - * (host / high-availability group / possibly credentials) either referenced from the Flink - * config, or included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. - * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException { - this(new Path(checkpointDataUri).toUri(), enableIncrementalCheckpointing); - } - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the - * file system and location defined by the given URI. - * - * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system - * host and port in the URI, or have the Hadoop configuration that describes the file system - * (host / high-availability group / possibly credentials) either referenced from the Flink - * config, or included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(URI checkpointDataUri) throws IOException { - this(new FsStateBackend(checkpointDataUri)); - } - - /** - * Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the - * file system and location defined by the given URI. - * - * <p>A state backend that stores checkpoints in HDFS or S3 must specify the file system - * host and port in the URI, or have the Hadoop configuration that describes the file system - * (host / high-availability group / possibly credentials) either referenced from the Flink - * config, or included in the classpath. - * - * @param checkpointDataUri The URI describing the filesystem and path to the checkpoint data directory. - * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. - * @throws IOException Thrown, if no file system can be found for the scheme in the URI. - */ - public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException { - this(new FsStateBackend(checkpointDataUri), enableIncrementalCheckpointing); - } - - /** - * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its - * checkpoint data streams. Typically, one would supply a filesystem or database state backend - * here where the snapshots from RocksDB would be stored. - * - * <p>The snapshots of the RocksDB state will be stored using the given backend's - * {@link StateBackend#createCheckpointStorage(JobID)}. - * - * @param checkpointStreamBackend The backend write the checkpoint streams to. - */ - public RocksDBStateBackend(StateBackend checkpointStreamBackend) { - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); - } - - /** - * Creates a new {@code RocksDBStateBackend} that uses the given state backend to store its - * checkpoint data streams. Typically, one would supply a filesystem or database state backend - * here where the snapshots from RocksDB would be stored. - * - * <p>The snapshots of the RocksDB state will be stored using the given backend's - * {@link StateBackend#createCheckpointStorage(JobID)}. - * - * @param checkpointStreamBackend The backend write the checkpoint streams to. - * @param enableIncrementalCheckpointing True if incremental checkpointing is enabled. - */ - public RocksDBStateBackend(StateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) { - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); - this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; - } - - /** - * @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead. - */ - @Deprecated - public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) { - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); - } - - /** - * @deprecated Use {@link #RocksDBStateBackend(StateBackend, boolean)} instead. - */ - @Deprecated - public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) { - this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend); - this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; - } - - /** - * Private constructor that creates a re-configured copy of the state backend. - * - * @param original The state backend to re-configure. - * @param config The configuration. - */ - private RocksDBStateBackend(RocksDBStateBackend original, Configuration config) { - // reconfigure the state backend backing the streams - final StateBackend originalStreamBackend = original.checkpointStreamBackend; - this.checkpointStreamBackend = originalStreamBackend instanceof ConfigurableStateBackend ? - ((ConfigurableStateBackend) originalStreamBackend).configure(config) : - originalStreamBackend; - - // configure incremental checkpoints - if (original.enableIncrementalCheckpointing != null) { - this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing; - } - else { - this.enableIncrementalCheckpointing = - config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS); - } - - // configure local directories - if (original.localRocksDbDirectories != null) { - this.localRocksDbDirectories = original.localRocksDbDirectories; - } - else { - final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES); - if (rocksdbLocalPaths != null) { - String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator); - - try { - setDbStoragePaths(directories); - } - catch (IllegalArgumentException e) { - throw new IllegalConfigurationException("Invalid configuration for RocksDB state " + - "backend's local storage directories: " + e.getMessage(), e); - } - } - } - - // copy remaining settings - this.predefinedOptions = original.predefinedOptions; - this.optionsFactory = original.optionsFactory; - } - - // ------------------------------------------------------------------------ - // Reconfiguration - // ------------------------------------------------------------------------ - - /** - * Creates a copy of this state backend that uses the values defined in the configuration - * for fields where that were not yet specified in this state backend. - * - * @param config the configuration - * @return The re-configured variant of the state backend - */ - @Override - public RocksDBStateBackend configure(Configuration config) { - return new RocksDBStateBackend(this, config); - } - - // ------------------------------------------------------------------------ - // State backend methods - // ------------------------------------------------------------------------ - - /** - * Gets the state backend that this RocksDB state backend uses to persist - * its bytes to. - * - * <p>This RocksDB state backend only implements the RocksDB specific parts, it - * relies on the 'CheckpointBackend' to persist the checkpoint and savepoint bytes - * streams. - */ - public StateBackend getCheckpointBackend() { - return checkpointStreamBackend; - } - - private void lazyInitializeForJob( - Environment env, - @SuppressWarnings("unused") String operatorIdentifier) throws IOException { - - if (isInitialized) { - return; - } - - this.jobId = env.getJobID(); - - // initialize the paths where the local RocksDB files should be stored - if (localRocksDbDirectories == null) { - // initialize from the temp directories - initializedDbBasePaths = env.getIOManager().getSpillingDirectories(); - } - else { - List<File> dirs = new ArrayList<>(localRocksDbDirectories.length); - String errorMessage = ""; - - for (Path path : localRocksDbDirectories) { - File f = new File(path.toUri().getPath()); - File testDir = new File(f, UUID.randomUUID().toString()); - if (!testDir.mkdirs()) { - String msg = "Local DB files directory '" + path - + "' does not exist and cannot be created. "; - LOG.error(msg); - errorMessage += msg; - } else { - dirs.add(f); - } - //noinspection ResultOfMethodCallIgnored - testDir.delete(); - } - - if (dirs.isEmpty()) { - throw new IOException("No local storage directories available. " + errorMessage); - } else { - initializedDbBasePaths = dirs.toArray(new File[dirs.size()]); - } - } - - nextDirectory = new Random().nextInt(initializedDbBasePaths.length); - - isInitialized = true; - } - - private File getNextStoragePath() { - int ni = nextDirectory + 1; - ni = ni >= initializedDbBasePaths.length ? 0 : ni; - nextDirectory = ni; - - return initializedDbBasePaths[ni]; - } - - // ------------------------------------------------------------------------ - // Checkpoint initialization and persistent storage - // ------------------------------------------------------------------------ - - @Override - public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException { - return checkpointStreamBackend.resolveCheckpoint(pointer); - } - - @Override - public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException { - return checkpointStreamBackend.createCheckpointStorage(jobId); - } - - // ------------------------------------------------------------------------ - // State holding data structures - // ------------------------------------------------------------------------ - - @Override - public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( - Environment env, - JobID jobID, - String operatorIdentifier, - TypeSerializer<K> keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - TaskKvStateRegistry kvStateRegistry) throws IOException { - - // first, make sure that the RocksDB JNI library is loaded - // we do this explicitly here to have better error handling - String tempDir = env.getTaskManagerInfo().getTmpDirectories()[0]; - ensureRocksDBIsLoaded(tempDir); - - lazyInitializeForJob(env, operatorIdentifier); - - File instanceBasePath = - new File(getNextStoragePath(), "job-" + jobId + "_op-" + operatorIdentifier + "_uuid-" + UUID.randomUUID()); - - return new RocksDBKeyedStateBackend<>( - operatorIdentifier, - env.getUserClassLoader(), - instanceBasePath, - getDbOptions(), - getColumnOptions(), - kvStateRegistry, - keySerializer, - numberOfKeyGroups, - keyGroupRange, - env.getExecutionConfig(), - isIncrementalCheckpointsEnabled()); - } - - @Override - public OperatorStateBackend createOperatorStateBackend( - Environment env, - String operatorIdentifier) throws Exception { - - //the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too. - final boolean asyncSnapshots = true; - return new DefaultOperatorStateBackend( - env.getUserClassLoader(), - env.getExecutionConfig(), - asyncSnapshots); - } - - // ------------------------------------------------------------------------ - // Parameters - // ------------------------------------------------------------------------ - - /** - * Sets the path where the RocksDB local database files should be stored on the local - * file system. Setting this path overrides the default behavior, where the - * files are stored across the configured temp directories. - * - * <p>Passing {@code null} to this function restores the default behavior, where the configured - * temp directories will be used. - * - * @param path The path where the local RocksDB database files are stored. - */ - public void setDbStoragePath(String path) { - setDbStoragePaths(path == null ? null : new String[] { path }); - } - - /** - * Sets the paths across which the local RocksDB database files are distributed on the local - * file system. Setting these paths overrides the default behavior, where the - * files are stored across the configured temp directories. - * - * <p>Each distinct state will be stored in one path, but when the state backend creates - * multiple states, they will store their files on different paths. - * - * <p>Passing {@code null} to this function restores the default behavior, where the configured - * temp directories will be used. - * - * @param paths The paths across which the local RocksDB database files will be spread. - */ - public void setDbStoragePaths(String... paths) { - if (paths == null) { - localRocksDbDirectories = null; - } - else if (paths.length == 0) { - throw new IllegalArgumentException("empty paths"); - } - else { - Path[] pp = new Path[paths.length]; - - for (int i = 0; i < paths.length; i++) { - if (paths[i] == null) { - throw new IllegalArgumentException("null path"); - } - - pp[i] = new Path(paths[i]); - String scheme = pp[i].toUri().getScheme(); - if (scheme != null && !scheme.equalsIgnoreCase("file")) { - throw new IllegalArgumentException("Path " + paths[i] + " has a non local scheme"); - } - } - - localRocksDbDirectories = pp; - } - } - - /** - * - * @return The configured DB storage paths, or null, if none were configured. - */ - public String[] getDbStoragePaths() { - if (localRocksDbDirectories == null) { - return null; - } else { - String[] paths = new String[localRocksDbDirectories.length]; - for (int i = 0; i < paths.length; i++) { - paths[i] = localRocksDbDirectories[i].toString(); - } - return paths; - } - } - - /** - * Gets whether incremental checkpoints are enabled for this state backend. - */ - public boolean isIncrementalCheckpointsEnabled() { - if (enableIncrementalCheckpointing != null) { - return enableIncrementalCheckpointing; - } - else { - return CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue(); - } - } - - // ------------------------------------------------------------------------ - // Parametrize with RocksDB Options - // ------------------------------------------------------------------------ - - /** - * Sets the predefined options for RocksDB. - * - * <p>If a user-defined options factory is set (via {@link #setOptions(OptionsFactory)}), - * then the options from the factory are applied on top of the here specified - * predefined options. - * - * @param options The options to set (must not be null). - */ - public void setPredefinedOptions(PredefinedOptions options) { - predefinedOptions = checkNotNull(options); - } - - /** - * Gets the currently set predefined options for RocksDB. - * The default options (if nothing was set via {@link #setPredefinedOptions(PredefinedOptions)}) - * are {@link PredefinedOptions#DEFAULT}. - * - * <p>If a user-defined options factory is set (via {@link #setOptions(OptionsFactory)}), - * then the options from the factory are applied on top of the predefined options. - * - * @return The currently set predefined options for RocksDB. - */ - public PredefinedOptions getPredefinedOptions() { - return predefinedOptions; - } - - /** - * Sets {@link org.rocksdb.Options} for the RocksDB instances. - * Because the options are not serializable and hold native code references, - * they must be specified through a factory. - * - * <p>The options created by the factory here are applied on top of the pre-defined - * options profile selected via {@link #setPredefinedOptions(PredefinedOptions)}. - * If the pre-defined options profile is the default - * ({@link PredefinedOptions#DEFAULT}), then the factory fully controls the RocksDB - * options. - * - * @param optionsFactory The options factory that lazily creates the RocksDB options. - */ - public void setOptions(OptionsFactory optionsFactory) { - this.optionsFactory = optionsFactory; - } - - /** - * Gets the options factory that lazily creates the RocksDB options. - * - * @return The options factory. - */ - public OptionsFactory getOptions() { - return optionsFactory; - } - - /** - * Gets the RocksDB {@link DBOptions} to be used for all RocksDB instances. - */ - public DBOptions getDbOptions() { - // initial options from pre-defined profile - DBOptions opt = predefinedOptions.createDBOptions(); - - // add user-defined options, if specified - if (optionsFactory != null) { - opt = optionsFactory.createDBOptions(opt); - } - - // add necessary default options - opt = opt.setCreateIfMissing(true); - - return opt; - } - - /** - * Gets the RocksDB {@link ColumnFamilyOptions} to be used for all RocksDB instances. - */ - public ColumnFamilyOptions getColumnOptions() { - // initial options from pre-defined profile - ColumnFamilyOptions opt = predefinedOptions.createColumnOptions(); - - // add user-defined options, if specified - if (optionsFactory != null) { - opt = optionsFactory.createColumnOptions(opt); - } - - return opt; - } - - // ------------------------------------------------------------------------ - // utilities - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "RocksDBStateBackend{" + - "checkpointStreamBackend=" + checkpointStreamBackend + - ", localRocksDbDirectories=" + Arrays.toString(localRocksDbDirectories) + - ", enableIncrementalCheckpointing=" + enableIncrementalCheckpointing + - '}'; - } - - // ------------------------------------------------------------------------ - // static library loading utilities - // ------------------------------------------------------------------------ - - private void ensureRocksDBIsLoaded(String tempDirectory) throws IOException { - synchronized (RocksDBStateBackend.class) { - if (!rocksDbInitialized) { - - final File tempDirParent = new File(tempDirectory).getAbsoluteFile(); - LOG.info("Attempting to load RocksDB native library and store it under '{}'", tempDirParent); - - Throwable lastException = null; - for (int attempt = 1; attempt <= ROCKSDB_LIB_LOADING_ATTEMPTS; attempt++) { - try { - // when multiple instances of this class and RocksDB exist in different - // class loaders, then we can see the following exception: - // "java.lang.UnsatisfiedLinkError: Native Library /path/to/temp/dir/librocksdbjni-linux64.so - // already loaded in another class loader" - - // to avoid that, we need to add a random element to the library file path - // (I know, seems like an unnecessary hack, since the JVM obviously can handle multiple - // instances of the same JNI library being loaded in different class loaders, but - // apparently not when coming from the same file path, so there we go) - - final File rocksLibFolder = new File(tempDirParent, "rocksdb-lib-" + new AbstractID()); - - // make sure the temp path exists - LOG.debug("Attempting to create RocksDB native library folder {}", rocksLibFolder); - // noinspection ResultOfMethodCallIgnored - rocksLibFolder.mkdirs(); - - // explicitly load the JNI dependency if it has not been loaded before - NativeLibraryLoader.getInstance().loadLibrary(rocksLibFolder.getAbsolutePath()); - - // this initialization here should validate that the loading succeeded - RocksDB.loadLibrary(); - - // seems to have worked - LOG.info("Successfully loaded RocksDB native library"); - rocksDbInitialized = true; - return; - } - catch (Throwable t) { - lastException = t; - LOG.debug("RocksDB JNI library loading attempt {} failed", attempt, t); - - // try to force RocksDB to attempt reloading the library - try { - resetRocksDBLoadedFlag(); - } catch (Throwable tt) { - LOG.debug("Failed to reset 'initialized' flag in RocksDB native code loader", tt); - } - } - } - - throw new IOException("Could not load the native RocksDB library", lastException); - } - } - } - - @VisibleForTesting - static void resetRocksDBLoadedFlag() throws Exception { - final Field initField = org.rocksdb.NativeLibraryLoader.class.getDeclaredField("initialized"); - initField.setAccessible(true); - initField.setBoolean(null, false); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java deleted file mode 100644 index 94e15fa..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.runtime.state.StateBackendFactory; - -import java.io.IOException; - -/** - * A factory that creates an {@link org.apache.flink.contrib.streaming.state.RocksDBStateBackend} - * from a configuration. - */ -public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBStateBackend> { - - @Override - public RocksDBStateBackend createFromConfig(Configuration config) - throws IllegalConfigurationException, IOException { - - // we need to explicitly read the checkpoint directory here, because that - // is a required constructor parameter - final String checkpointDirURI = config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY); - if (checkpointDirURI == null) { - throw new IllegalConfigurationException( - "Cannot create the RocksDB state backend: The configuration does not specify the " + - "checkpoint directory '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\''); - } - - return new RocksDBStateBackend(checkpointDirURI).configure(config); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java deleted file mode 100644 index da21e8a..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.internal.InternalValueState; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; - -import java.io.ByteArrayInputStream; -import java.io.IOException; - -/** - * {@link ValueState} implementation that stores state in RocksDB. - * - * @param <K> The type of the key. - * @param <N> The type of the namespace. - * @param <V> The type of value that the state state stores. - */ -public class RocksDBValueState<K, N, V> - extends AbstractRocksDBState<K, N, ValueState<V>, ValueStateDescriptor<V>, V> - implements InternalValueState<N, V> { - - /** Serializer for the values. */ - private final TypeSerializer<V> valueSerializer; - - /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** - * Creates a new {@code RocksDBValueState}. - * - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - */ - public RocksDBValueState(ColumnFamilyHandle columnFamily, - TypeSerializer<N> namespaceSerializer, - ValueStateDescriptor<V> stateDesc, - RocksDBKeyedStateBackend<K> backend) { - - super(columnFamily, namespaceSerializer, stateDesc, backend); - this.valueSerializer = stateDesc.getSerializer(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - @Override - public V value() { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - if (valueBytes == null) { - return stateDesc.getDefaultValue(); - } - return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while retrieving data from RocksDB.", e); - } - } - - @Override - public void update(V value) throws IOException { - if (value == null) { - clear(); - return; - } - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - keySerializationStream.reset(); - valueSerializer.serialize(value, out); - backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } catch (Exception e) { - throw new RuntimeException("Error while adding data to RocksDB", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java deleted file mode 100644 index bae1f81..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.VoidSerializer; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.core.testutils.OneShotLatch; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream; -import org.apache.flink.runtime.state.CheckpointedStateScope; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.TestTaskStateManager; -import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.runtime.state.testutils.BackendForTestStream; -import org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFactory; -import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory; -import org.apache.flink.runtime.taskmanager.CheckpointResponder; -import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.util.FutureUtil; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.TestLogger; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.io.File; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.Arrays; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RunnableFuture; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -/** - * Tests for asynchronous RocksDB Key/Value state checkpoints. - */ -@RunWith(PowerMockRunner.class) -@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"}) -@SuppressWarnings("serial") -public class RocksDBAsyncSnapshotTest extends TestLogger { - - /** - * Temporary fold for test. - */ - @Rule - public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - - /** - * This ensures that asynchronous state handles are actually materialized asynchronously. - * - * <p>We use latches to block at various stages and see if the code still continues through - * the parts that are not asynchronous. If the checkpoint is not done asynchronously the - * test will simply lock forever. - */ - @Test - public void testFullyAsyncSnapshot() throws Exception { - - final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( - OneInputStreamTask::new, - BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - testHarness.setupOutputForSingletonOperatorChain(); - - testHarness.configureForKeyedStream(new KeySelector<String, String>() { - @Override - public String getKey(String value) throws Exception { - return value; - } - }, BasicTypeInfo.STRING_TYPE_INFO); - - StreamConfig streamConfig = testHarness.getStreamConfig(); - - File dbDir = temporaryFolder.newFolder(); - - RocksDBStateBackend backend = new RocksDBStateBackend(new MemoryStateBackend()); - backend.setDbStoragePath(dbDir.getAbsolutePath()); - - streamConfig.setStateBackend(backend); - - streamConfig.setStreamOperator(new AsyncCheckpointOperator()); - streamConfig.setOperatorID(new OperatorID()); - - final OneShotLatch delayCheckpointLatch = new OneShotLatch(); - final OneShotLatch ensureCheckpointLatch = new OneShotLatch(); - - CheckpointResponder checkpointResponderMock = new CheckpointResponder() { - - @Override - public void acknowledgeCheckpoint( - JobID jobID, - ExecutionAttemptID executionAttemptID, - long checkpointId, - CheckpointMetrics checkpointMetrics, - TaskStateSnapshot subtaskState) { - // block on the latch, to verify that triggerCheckpoint returns below, - // even though the async checkpoint would not finish - try { - delayCheckpointLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - boolean hasManagedKeyedState = false; - for (Map.Entry<OperatorID, OperatorSubtaskState> entry : subtaskState.getSubtaskStateMappings()) { - OperatorSubtaskState state = entry.getValue(); - if (state != null) { - hasManagedKeyedState |= state.getManagedKeyedState() != null; - } - } - - // should be one k/v state - assertTrue(hasManagedKeyedState); - - // we now know that the checkpoint went through - ensureCheckpointLatch.trigger(); - } - - @Override - public void declineCheckpoint( - JobID jobID, ExecutionAttemptID executionAttemptID, - long checkpointId, Throwable cause) { - - } - }; - - JobID jobID = new JobID(); - ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(0L, 0L); - TestTaskStateManager taskStateManagerTestMock = new TestTaskStateManager( - jobID, - executionAttemptID, - checkpointResponderMock); - - StreamMockEnvironment mockEnv = new StreamMockEnvironment( - testHarness.jobConfig, - testHarness.taskConfig, - testHarness.memorySize, - new MockInputSplitProvider(), - testHarness.bufferSize, - taskStateManagerTestMock); - - testHarness.invoke(mockEnv); - - final OneInputStreamTask<String, String> task = testHarness.getTask(); - - // wait for the task to be running - for (Field field: StreamTask.class.getDeclaredFields()) { - if (field.getName().equals("isRunning")) { - field.setAccessible(true); - while (!field.getBoolean(task)) { - Thread.sleep(10); - } - } - } - - task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forCheckpointWithDefaultLocation()); - - testHarness.processElement(new StreamRecord<>("Wohoo", 0)); - - // now we allow the checkpoint - delayCheckpointLatch.trigger(); - - // wait for the checkpoint to go through - ensureCheckpointLatch.await(); - - testHarness.endInput(); - - ExecutorService threadPool = task.getAsyncOperationsThreadPool(); - threadPool.shutdown(); - Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS)); - - testHarness.waitForTaskCompletion(); - if (mockEnv.wasFailedExternally()) { - fail("Unexpected exception during execution."); - } - } - - /** - * This tests ensures that canceling of asynchronous snapshots works as expected and does not block. - */ - @Test - public void testCancelFullyAsyncCheckpoints() throws Exception { - final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>( - OneInputStreamTask::new, - BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); - - testHarness.setupOutputForSingletonOperatorChain(); - - testHarness.configureForKeyedStream(value -> value, BasicTypeInfo.STRING_TYPE_INFO); - - StreamConfig streamConfig = testHarness.getStreamConfig(); - - File dbDir = temporaryFolder.newFolder(); - - // this is the proper instance that we need to call. - BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = - new BlockerCheckpointStreamFactory(4 * 1024 * 1024) { - - int count = 1; - - @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream(CheckpointedStateScope scope) throws Exception { - // we skip the first created stream, because it is used to checkpoint the timer service, which is - // currently not asynchronous. - if (count > 0) { - --count; - return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize); - } else { - return super.createCheckpointStateOutputStream(scope); - } - } - }; - - // to avoid serialization of the above factory instance, we need to pass it in - // through a static variable - - StateBackend stateBackend = new BackendForTestStream(new StaticForwardFactory(blockerCheckpointStreamFactory)); - - RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend); - backend.setDbStoragePath(dbDir.getAbsolutePath()); - - streamConfig.setStateBackend(backend); - - streamConfig.setStreamOperator(new AsyncCheckpointOperator()); - streamConfig.setOperatorID(new OperatorID()); - - TestTaskStateManager taskStateManagerTestMock = new TestTaskStateManager(); - - StreamMockEnvironment mockEnv = new StreamMockEnvironment( - testHarness.jobConfig, - testHarness.taskConfig, - testHarness.memorySize, - new MockInputSplitProvider(), - testHarness.bufferSize, - taskStateManagerTestMock); - - blockerCheckpointStreamFactory.setBlockerLatch(new OneShotLatch()); - blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch()); - - testHarness.invoke(mockEnv); - - final OneInputStreamTask<String, String> task = testHarness.getTask(); - - // wait for the task to be running - for (Field field: StreamTask.class.getDeclaredFields()) { - if (field.getName().equals("isRunning")) { - field.setAccessible(true); - while (!field.getBoolean(task)) { - Thread.sleep(10); - } - } - } - - task.triggerCheckpoint( - new CheckpointMetaData(42, 17), - CheckpointOptions.forCheckpointWithDefaultLocation()); - - testHarness.processElement(new StreamRecord<>("Wohoo", 0)); - blockerCheckpointStreamFactory.getWaiterLatch().await(); - task.cancel(); - blockerCheckpointStreamFactory.getBlockerLatch().trigger(); - testHarness.endInput(); - Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); - - try { - ExecutorService threadPool = task.getAsyncOperationsThreadPool(); - threadPool.shutdown(); - Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS)); - testHarness.waitForTaskCompletion(); - - fail("Operation completed. Cancel failed."); - } catch (Exception expected) { - - Throwable cause = expected.getCause(); - - if (!(cause instanceof CancelTaskException)) { - fail("Unexpected exception: " + expected); - } - } - } - - /** - * Test that the snapshot files are cleaned up in case of a failure during the snapshot - * procedure. - */ - @Test - public void testCleanupOfSnapshotsInFailureCase() throws Exception { - long checkpointId = 1L; - long timestamp = 42L; - - Environment env = new DummyEnvironment("test task", 1, 0); - - final IOException testException = new IOException("Test exception"); - CheckpointStateOutputStream outputStream = spy(new FailingStream(testException)); - - RocksDBStateBackend backend = new RocksDBStateBackend((StateBackend) new MemoryStateBackend()); - - backend.setDbStoragePath(temporaryFolder.newFolder().toURI().toString()); - - AbstractKeyedStateBackend<Void> keyedStateBackend = backend.createKeyedStateBackend( - env, - new JobID(), - "test operator", - VoidSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - null); - - try { - - keyedStateBackend.restore(null); - - // register a state so that the state backend has to checkpoint something - keyedStateBackend.getPartitionedState( - "namespace", - StringSerializer.INSTANCE, - new ValueStateDescriptor<>("foobar", String.class)); - - RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot( - checkpointId, timestamp, - new TestCheckpointStreamFactory(() -> outputStream), - CheckpointOptions.forCheckpointWithDefaultLocation()); - - try { - FutureUtil.runIfNotDoneAndGet(snapshotFuture); - fail("Expected an exception to be thrown here."); - } catch (ExecutionException e) { - Assert.assertEquals(testException, e.getCause()); - } - - verify(outputStream).close(); - } finally { - IOUtils.closeQuietly(keyedStateBackend); - keyedStateBackend.dispose(); - } - } - - @Test - public void testConsistentSnapshotSerializationFlagsAndMasks() { - - Assert.assertEquals(0xFFFF, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK); - Assert.assertEquals(0x80, RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - - byte[] expectedKey = new byte[] {42, 42}; - byte[] modKey = expectedKey.clone(); - - Assert.assertFalse( - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); - - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.setMetaDataFollowsFlagInKey(modKey); - Assert.assertTrue(RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); - - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(modKey); - Assert.assertFalse( - RocksDBKeyedStateBackend.RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(modKey)); - - Assert.assertTrue(Arrays.equals(expectedKey, modKey)); - } - - // ------------------------------------------------------------------------ - - private static class AsyncCheckpointOperator - extends AbstractStreamOperator<String> - implements OneInputStreamOperator<String, String> { - - @Override - public void open() throws Exception { - super.open(); - - // also get the state in open, this way we are sure that it was created before - // we trigger the test checkpoint - ValueState<String> state = getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>("count", StringSerializer.INSTANCE)); - - } - - @Override - public void processElement(StreamRecord<String> element) throws Exception { - // we also don't care - - ValueState<String> state = getPartitionedState( - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>("count", StringSerializer.INSTANCE)); - - state.update(element.getValue()); - } - } - - // ------------------------------------------------------------------------ - // failing stream - // ------------------------------------------------------------------------ - - private static class StaticForwardFactory implements StreamFactory { - - static CheckpointStreamFactory factory; - - StaticForwardFactory(CheckpointStreamFactory factory) { - StaticForwardFactory.factory = factory; - } - - @Override - public CheckpointStateOutputStream get() throws Exception { - return factory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - } - } - - private static class FailingStream extends CheckpointStateOutputStream { - - private final IOException testException; - - FailingStream(IOException testException) { - this.testException = testException; - } - - @Override - public StreamStateHandle closeAndGetHandle() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public long getPos() throws IOException { - throw new UnsupportedOperationException(); - } - - @Override - public void write(int b) throws IOException { - throw testException; - } - - @Override - public void flush() throws IOException { - throw testException; - } - - @Override - public void sync() throws IOException { - throw testException; - } - - @Override - public void close() throws IOException { - throw new UnsupportedOperationException(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java deleted file mode 100644 index 565f27d..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBInitResetTest.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.junit.Test; - -/** - * This test checks that the RocksDB native code loader still responds to resetting the init flag. - */ -public class RocksDBInitResetTest { - - @Test - public void testResetInitFlag() throws Exception { - RocksDBStateBackend.resetRocksDBLoadedFlag(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java deleted file mode 100644 index 1d14f6e..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksIterator; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; - -/** - * Tests for the RocksDBMergeIterator. - */ -public class RocksDBMergeIteratorTest { - - private static final int NUM_KEY_VAL_STATES = 50; - private static final int MAX_NUM_KEYS = 20; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Test - public void testEmptyMergeIterator() throws IOException { - RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator = - new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2); - Assert.assertFalse(emptyIterator.isValid()); - } - - @Test - public void testMergeIteratorByte() throws Exception { - Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE); - - testMergeIterator(Byte.MAX_VALUE); - } - - @Test - public void testMergeIteratorShort() throws Exception { - Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE); - - testMergeIterator(Short.MAX_VALUE); - } - - public void testMergeIterator(int maxParallelism) throws Exception { - Random random = new Random(1234); - - RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath()); - try { - List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>(); - List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>(); - - int totalKeysExpected = 0; - - for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) { - ColumnFamilyHandle handle = rocksDB.createColumnFamily( - new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET))); - - ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos(); - DataOutputStream dos = new DataOutputStream(bos); - - int numKeys = random.nextInt(MAX_NUM_KEYS + 1); - - for (int i = 0; i < numKeys; ++i) { - if (maxParallelism <= Byte.MAX_VALUE) { - dos.writeByte(i); - } else { - dos.writeShort(i); - } - dos.writeInt(i); - byte[] key = bos.toByteArray(); - byte[] val = new byte[]{42}; - rocksDB.put(handle, key, val); - - bos.reset(); - } - columnFamilyHandlesWithKeyCount.add(new Tuple2<>(handle, numKeys)); - totalKeysExpected += numKeys; - } - - int id = 0; - for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount) { - rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0), id)); - ++id; - } - - RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId, maxParallelism <= Byte.MAX_VALUE ? 1 : 2); - - int prevKVState = -1; - int prevKey = -1; - int prevKeyGroup = -1; - int totalKeysActual = 0; - - while (mergeIterator.isValid()) { - ByteBuffer bb = ByteBuffer.wrap(mergeIterator.key()); - - int keyGroup = maxParallelism > Byte.MAX_VALUE ? bb.getShort() : bb.get(); - int key = bb.getInt(); - - Assert.assertTrue(keyGroup >= prevKeyGroup); - Assert.assertTrue(key >= prevKey); - Assert.assertEquals(prevKeyGroup != keyGroup, mergeIterator.isNewKeyGroup()); - Assert.assertEquals(prevKVState != mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState()); - - prevKeyGroup = keyGroup; - prevKVState = mergeIterator.kvStateId(); - - //System.out.println(keyGroup + " " + key + " " + mergeIterator.kvStateId()); - mergeIterator.next(); - ++totalKeysActual; - } - - Assert.assertEquals(totalKeysExpected, totalKeysActual); - - for (Tuple2<ColumnFamilyHandle, Integer> handleWithCount : columnFamilyHandlesWithKeyCount) { - rocksDB.dropColumnFamily(handleWithCount.f0); - } - } finally { - rocksDB.close(); - } - } - -}