Repository: flink Updated Branches: refs/heads/master e96f28bd3 -> 1e315f0dd
[FLINK-8639][State Backends] Fix always need to seek multiple times when iterator RocksDBMapState This closes #5465. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e315f0d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e315f0d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e315f0d Branch: refs/heads/master Commit: 1e315f0dd25169a029b6087fabb15b6709881b8c Parents: e96f28b Author: summerleafs <summerle...@163.com> Authored: Mon Feb 12 22:28:04 2018 +0800 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Feb 22 15:58:50 2018 +0100 ---------------------------------------------------------------------- .../streaming/state/AbstractRocksDBState.java | 47 -------------------- .../streaming/state/RocksDBMapState.java | 21 ++++++--- 2 files changed, 14 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1e315f0d/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java index 969a1fc..64b6d48 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java @@ -21,10 +21,7 @@ import org.apache.flink.api.common.state.State; import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; -import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; @@ -220,48 +217,4 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD extends Sta value >>>= 8; } while (value != 0); } - - protected Tuple3<Integer, K, N> readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { - int keyGroup = readKeyGroup(inputView); - K key = readKey(inputStream, inputView); - N namespace = readNamespace(inputStream, inputView); - - return new Tuple3<>(keyGroup, key, namespace); - } - - private int readKeyGroup(DataInputView inputView) throws IOException { - int keyGroup = 0; - for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) { - keyGroup <<= 8; - keyGroup |= (inputView.readByte() & 0xFF); - } - return keyGroup; - } - - private K readKey(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { - int beforeRead = inputStream.getPosition(); - K key = backend.getKeySerializer().deserialize(inputView); - if (ambiguousKeyPossible) { - int length = inputStream.getPosition() - beforeRead; - readVariableIntBytes(inputView, length); - } - return key; - } - - private N readNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { - int beforeRead = inputStream.getPosition(); - N namespace = namespaceSerializer.deserialize(inputView); - if (ambiguousKeyPossible) { - int length = inputStream.getPosition() - beforeRead; - readVariableIntBytes(inputView, length); - } - return namespace; - } - - private void readVariableIntBytes(DataInputView inputView, int value) throws IOException { - do { - inputView.readByte(); - value >>>= 8; - } while (value != 0); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/1e315f0d/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java ---------------------------------------------------------------------- diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index e8c34cc..6b7177b 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -39,6 +39,8 @@ import org.rocksdb.WriteOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; @@ -72,6 +74,9 @@ public class RocksDBMapState<K, N, UK, UV> */ private final WriteOptions writeOptions; + /** The offset of User Key offset in raw key bytes. */ + private int userKeyOffset; + /** * Creates a new {@code RocksDBMapState}. * @@ -262,12 +267,13 @@ public class RocksDBMapState<K, N, UK, UV> private byte[] serializeCurrentKeyAndNamespace() throws IOException { writeCurrentKeyWithGroupAndNamespace(); + userKeyOffset = keySerializationStream.getPosition(); return keySerializationStream.toByteArray(); } private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) throws IOException { - writeCurrentKeyWithGroupAndNamespace(); + serializeCurrentKeyAndNamespace(); userKeySerializer.serialize(userKey, keySerializationDataOutputView); return keySerializationStream.toByteArray(); @@ -290,7 +296,7 @@ public class RocksDBMapState<K, N, UK, UV> ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes); DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); - readKeyWithGroupAndNamespace(bais, in); + in.skipBytes(userKeyOffset); return userKeySerializer.deserialize(in); } @@ -327,7 +333,10 @@ public class RocksDBMapState<K, N, UK, UV> private UK userKey = null; private UV userValue = null; - RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, final byte[] rawValueBytes) { + RocksDBMapEntry( + @Nonnull final RocksDB db, + @Nonnull final byte[] rawKeyBytes, + @Nonnull final byte[] rawValueBytes) { this.db = db; this.rawKeyBytes = rawKeyBytes; @@ -400,7 +409,6 @@ public class RocksDBMapState<K, N, UK, UV> /** An auxiliary utility to scan all entries under the given key. */ private abstract class RocksDBMapIterator<T> implements Iterator<T> { - static final int CACHE_SIZE_BASE = 1; static final int CACHE_SIZE_LIMIT = 128; /** The db where data resides. */ @@ -481,7 +489,6 @@ public class RocksDBMapState<K, N, UK, UV> */ RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1); byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes); - int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT)); cacheEntries.clear(); cacheIndex = 0; @@ -502,7 +509,7 @@ public class RocksDBMapState<K, N, UK, UV> break; } - if (cacheEntries.size() >= numEntries) { + if (cacheEntries.size() >= CACHE_SIZE_LIMIT) { break; } @@ -520,7 +527,7 @@ public class RocksDBMapState<K, N, UK, UV> return false; } - for (int i = 0; i < keyPrefixBytes.length; ++i) { + for (int i = keyPrefixBytes.length; --i >= backend.getKeyGroupPrefixBytes(); ) { if (rawKeyBytes[i] != keyPrefixBytes[i]) { return false; }