Repository: flink
Updated Branches:
  refs/heads/master e96f28bd3 -> 1e315f0dd


[FLINK-8639][State Backends] Fix always need to seek multiple times when 
iterator RocksDBMapState

This closes #5465.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e315f0d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e315f0d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e315f0d

Branch: refs/heads/master
Commit: 1e315f0dd25169a029b6087fabb15b6709881b8c
Parents: e96f28b
Author: summerleafs <summerle...@163.com>
Authored: Mon Feb 12 22:28:04 2018 +0800
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu Feb 22 15:58:50 2018 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   | 47 --------------------
 .../streaming/state/RocksDBMapState.java        | 21 ++++++---
 2 files changed, 14 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e315f0d/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 969a1fc..64b6d48 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -21,10 +21,7 @@ import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
@@ -220,48 +217,4 @@ public abstract class AbstractRocksDBState<K, N, S extends 
State, SD extends Sta
                        value >>>= 8;
                } while (value != 0);
        }
-
-       protected Tuple3<Integer, K, N> 
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
-               int keyGroup = readKeyGroup(inputView);
-               K key = readKey(inputStream, inputView);
-               N namespace = readNamespace(inputStream, inputView);
-
-               return new Tuple3<>(keyGroup, key, namespace);
-       }
-
-       private int readKeyGroup(DataInputView inputView) throws IOException {
-               int keyGroup = 0;
-               for (int i = 0; i < backend.getKeyGroupPrefixBytes(); ++i) {
-                       keyGroup <<= 8;
-                       keyGroup |= (inputView.readByte() & 0xFF);
-               }
-               return keyGroup;
-       }
-
-       private K readKey(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
-               int beforeRead = inputStream.getPosition();
-               K key = backend.getKeySerializer().deserialize(inputView);
-               if (ambiguousKeyPossible) {
-                       int length = inputStream.getPosition() - beforeRead;
-                       readVariableIntBytes(inputView, length);
-               }
-               return key;
-       }
-
-       private N readNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
-               int beforeRead = inputStream.getPosition();
-               N namespace = namespaceSerializer.deserialize(inputView);
-               if (ambiguousKeyPossible) {
-                       int length = inputStream.getPosition() - beforeRead;
-                       readVariableIntBytes(inputView, length);
-               }
-               return namespace;
-       }
-
-       private void readVariableIntBytes(DataInputView inputView, int value) 
throws IOException {
-               do {
-                       inputView.readByte();
-                       value >>>= 8;
-               } while (value != 0);
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1e315f0d/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index e8c34cc..6b7177b 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -39,6 +39,8 @@ import org.rocksdb.WriteOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -72,6 +74,9 @@ public class RocksDBMapState<K, N, UK, UV>
         */
        private final WriteOptions writeOptions;
 
+       /** The offset of User Key offset in raw key bytes. */
+       private int userKeyOffset;
+
        /**
         * Creates a new {@code RocksDBMapState}.
         *
@@ -262,12 +267,13 @@ public class RocksDBMapState<K, N, UK, UV>
 
        private byte[] serializeCurrentKeyAndNamespace() throws IOException {
                writeCurrentKeyWithGroupAndNamespace();
+               userKeyOffset = keySerializationStream.getPosition();
 
                return keySerializationStream.toByteArray();
        }
 
        private byte[] serializeUserKeyWithCurrentKeyAndNamespace(UK userKey) 
throws IOException {
-               writeCurrentKeyWithGroupAndNamespace();
+               serializeCurrentKeyAndNamespace();
                userKeySerializer.serialize(userKey, 
keySerializationDataOutputView);
 
                return keySerializationStream.toByteArray();
@@ -290,7 +296,7 @@ public class RocksDBMapState<K, N, UK, UV>
                ByteArrayInputStreamWithPos bais = new 
ByteArrayInputStreamWithPos(rawKeyBytes);
                DataInputViewStreamWrapper in = new 
DataInputViewStreamWrapper(bais);
 
-               readKeyWithGroupAndNamespace(bais, in);
+               in.skipBytes(userKeyOffset);
 
                return userKeySerializer.deserialize(in);
        }
@@ -327,7 +333,10 @@ public class RocksDBMapState<K, N, UK, UV>
                private UK userKey = null;
                private UV userValue = null;
 
-               RocksDBMapEntry(final RocksDB db, final byte[] rawKeyBytes, 
final byte[] rawValueBytes) {
+               RocksDBMapEntry(
+                       @Nonnull final RocksDB db,
+                       @Nonnull final byte[] rawKeyBytes,
+                       @Nonnull final byte[] rawValueBytes) {
                        this.db = db;
 
                        this.rawKeyBytes = rawKeyBytes;
@@ -400,7 +409,6 @@ public class RocksDBMapState<K, N, UK, UV>
        /** An auxiliary utility to scan all entries under the given key. */
        private abstract class RocksDBMapIterator<T> implements Iterator<T> {
 
-               static final int CACHE_SIZE_BASE = 1;
                static final int CACHE_SIZE_LIMIT = 128;
 
                /** The db where data resides. */
@@ -481,7 +489,6 @@ public class RocksDBMapState<K, N, UK, UV>
                         */
                        RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? 
null : cacheEntries.get(cacheEntries.size() - 1);
                        byte[] startBytes = (lastEntry == null ? keyPrefixBytes 
: lastEntry.rawKeyBytes);
-                       int numEntries = (lastEntry == null ? CACHE_SIZE_BASE : 
Math.min(cacheEntries.size() * 2, CACHE_SIZE_LIMIT));
 
                        cacheEntries.clear();
                        cacheIndex = 0;
@@ -502,7 +509,7 @@ public class RocksDBMapState<K, N, UK, UV>
                                        break;
                                }
 
-                               if (cacheEntries.size() >= numEntries) {
+                               if (cacheEntries.size() >= CACHE_SIZE_LIMIT) {
                                        break;
                                }
 
@@ -520,7 +527,7 @@ public class RocksDBMapState<K, N, UK, UV>
                                return false;
                        }
 
-                       for (int i = 0; i < keyPrefixBytes.length; ++i) {
+                       for (int i = keyPrefixBytes.length; --i >= 
backend.getKeyGroupPrefixBytes(); ) {
                                if (rawKeyBytes[i] != keyPrefixBytes[i]) {
                                        return false;
                                }

Reply via email to