[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5518 ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169993867 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java --- @@ -61,8 +61,9 @@ * over it keys are not supported. * @param state State variable for which existing keys will be returned. * @param namespace Namespace for which existing keys will be returned. +* @param namespaceSerializer the serializer for the namespace. */ -Stream getKeys(String state, N namespace); +Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer); --- End diff -- Checked! It is me that misunderstand something. The `getPartitionedState confused me, it just reset the `namespace` but not reset the `namespaceSerializer` for the `previous` state. Please just ignore me... Addressing this. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169991823 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -207,6 +208,9 @@ /** Unique ID of this backend. */ private UUID backendUID; + /** The byte array for namespace serialization in getKeys(). */ + private final ByteArrayOutputStreamWithPos namespaceOutputStream; --- End diff -- Ok, sounds good ð ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169991617 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java --- @@ -61,8 +61,9 @@ * over it keys are not supported. * @param state State variable for which existing keys will be returned. * @param namespace Namespace for which existing keys will be returned. +* @param namespaceSerializer the serializer for the namespace. */ -Stream getKeys(String state, N namespace); +Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer); --- End diff -- Currently, each registered state has its own column family, so that should be ok. What made you think that this does not hold? (asking in case you found something that should be fixed) ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169991126 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend( this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.namespaceOutputStream = new ByteArrayOutputStreamWithPos(8); LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @Override - public Stream getKeys(String state, N namespace) { + public Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer) { Tuple2columnInfo = kvStateInformation.get(state); if (columnInfo == null) { return Stream.empty(); } - RocksIterator iterator = db.newIterator(columnInfo.f0); - iterator.seekToFirst(); + RocksIterator iterator = null; + try { + iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + boolean ambiguousKeyPossible = AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); + final byte[] nameSpaceBytes; - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + namespaceOutputStream.reset(); + AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace( + namespace, + namespaceSerializer, + namespaceOutputStream, + new DataOutputViewStreamWrapper(namespaceOutputStream), + ambiguousKeyPossible); + nameSpaceBytes = namespaceOutputStream.toByteArray(); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); + } + + final RocksIteratorWrapper iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, + ambiguousKeyPossible, nameSpaceBytes); + + Stream targetStream = StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false); --- End diff -- ð ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169991080 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -161,107 +160,131 @@ protected void writeKeyWithGroupAndNamespace( Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); keySerializationStream.reset(); - writeKeyGroup(keyGroup, keySerializationDataOutputView); - writeKey(key, keySerializationStream, keySerializationDataOutputView); - writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView); + AbstractRocksDBUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView); + AbstractRocksDBUtils.writeKey(key, backend.getKeySerializer(), keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); + AbstractRocksDBUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); } - private void writeKeyGroup( - int keyGroup, - DataOutputView keySerializationDateDataOutputView) throws IOException { - for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { - keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); - } + protected Tuple3readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { + int keyGroup = AbstractRocksDBUtils.readKeyGroup(backend.getKeyGroupPrefixBytes(), inputView); + K key = AbstractRocksDBUtils.readKey(backend.getKeySerializer(), inputStream, inputView, ambiguousKeyPossible); + N namespace = AbstractRocksDBUtils.readNamespace(namespaceSerializer, inputStream, inputView, ambiguousKeyPossible); + + return new Tuple3<>(keyGroup, key, namespace); } - private void writeKey( - K key, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDataOutputView) throws IOException { - //write key - int beforeWrite = keySerializationStream.getPosition(); - backend.getKeySerializer().serialize(key, keySerializationDataOutputView); - - if (ambiguousKeyPossible) { - //write size of key - writeLengthFrom(beforeWrite, keySerializationStream, - keySerializationDataOutputView); + /** +* Utils for RocksDB state serialization and deserialization. +*/ + static class AbstractRocksDBUtils { --- End diff -- Addressing ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169991026 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +2034,87 @@ public int numStateEntries() { return count; } - private static class RocksIteratorWrapper implements Iterator { + /** +* This class is not thread safety. --- End diff -- ð ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169990980 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +2034,87 @@ public int numStateEntries() { return count; } - private static class RocksIteratorWrapper implements Iterator { + /** +* This class is not thread safety. +*/ + static class RocksIteratorWrapper implements Iterator, AutoCloseable { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private final boolean ambiguousKeyPossible; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + final int basicLength = namespaceBytesLength + keyGroupPrefixBytes; + while (nextKey == null && iterator.isValid()) { + try { + byte[] key = iterator.key(); + if (key.length >= basicLength) { --- End diff -- Addressing ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169990779 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksIterator; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the RocksIteratorWrapper. + */ +public class RocksDBRocksIteratorWrapperTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testIterator() throws Exception{ + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); + } + +void testIteratorHelper( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + int maxKeyGroupNumber, + FunctiongetKeyFunc) throws Exception { + + String testStateName = "aha"; + String namespace = "ns"; + + String dbPath = tmp.newFolder().getAbsolutePath(); + String checkpointPath = tmp.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); + backend.setDbStoragePath(dbPath); + + Environment env = new DummyEnvironment("TestTask", 1, 0); + + RocksDBKeyedStateBackend keyedStateBackend = (RocksDBKeyedStateBackend) backend.createKeyedStateBackend( + env, + new JobID(), + "Test", + keySerializer, + maxKeyGroupNumber, + new KeyGroupRange(0, maxKeyGroupNumber - 1), + mock(TaskKvStateRegistry.class)); + + keyedStateBackend.restore(null); + + ValueState testState = keyedStateBackend.getPartitionedState( + namespace, +
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169990722 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksIterator; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the RocksIteratorWrapper. + */ +public class RocksDBRocksIteratorWrapperTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testIterator() throws Exception{ + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); + } + +void testIteratorHelper( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + int maxKeyGroupNumber, + FunctiongetKeyFunc) throws Exception { + + String testStateName = "aha"; + String namespace = "ns"; + + String dbPath = tmp.newFolder().getAbsolutePath(); + String checkpointPath = tmp.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); + backend.setDbStoragePath(dbPath); + + Environment env = new DummyEnvironment("TestTask", 1, 0); + + RocksDBKeyedStateBackend keyedStateBackend = (RocksDBKeyedStateBackend) backend.createKeyedStateBackend( + env, + new JobID(), + "Test", + keySerializer, + maxKeyGroupNumber, + new KeyGroupRange(0, maxKeyGroupNumber - 1), + mock(TaskKvStateRegistry.class)); + + keyedStateBackend.restore(null); + + ValueState testState = keyedStateBackend.getPartitionedState( + namespace, +
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169990518 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend( this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.namespaceOutputStream = new ByteArrayOutputStreamWithPos(8); LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @Override - public Stream getKeys(String state, N namespace) { + public Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer) { Tuple2columnInfo = kvStateInformation.get(state); if (columnInfo == null) { return Stream.empty(); } - RocksIterator iterator = db.newIterator(columnInfo.f0); - iterator.seekToFirst(); + RocksIterator iterator = null; + try { + iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + boolean ambiguousKeyPossible = AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); + final byte[] nameSpaceBytes; - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + namespaceOutputStream.reset(); + AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace( + namespace, + namespaceSerializer, + namespaceOutputStream, + new DataOutputViewStreamWrapper(namespaceOutputStream), + ambiguousKeyPossible); + nameSpaceBytes = namespaceOutputStream.toByteArray(); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); + } + + final RocksIteratorWrapper iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, + ambiguousKeyPossible, nameSpaceBytes); + + Stream targetStream = StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false); + return targetStream.onClose(() -> { + try { + iteratorWrapper.close(); + } catch (Exception ex) { + LOG.warn("Release RocksIteratorWrapper failed.", ex); + } + }); + } catch (Exception ex) { --- End diff -- ð I think I misunderstand your thought before... addressing ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169990649 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksIterator; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the RocksIteratorWrapper. + */ +public class RocksDBRocksIteratorWrapperTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testIterator() throws Exception{ + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); + } + +void testIteratorHelper( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + int maxKeyGroupNumber, + FunctiongetKeyFunc) throws Exception { + + String testStateName = "aha"; + String namespace = "ns"; + + String dbPath = tmp.newFolder().getAbsolutePath(); + String checkpointPath = tmp.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); + backend.setDbStoragePath(dbPath); + + Environment env = new DummyEnvironment("TestTask", 1, 0); + + RocksDBKeyedStateBackend keyedStateBackend = (RocksDBKeyedStateBackend) backend.createKeyedStateBackend( --- End diff -- Nice catch! Addressing ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169990041 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +2034,87 @@ public int numStateEntries() { return count; } - private static class RocksIteratorWrapper implements Iterator { + /** +* This class is not thread safety. +*/ + static class RocksIteratorWrapper implements Iterator, AutoCloseable { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private final boolean ambiguousKeyPossible; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + final int basicLength = namespaceBytesLength + keyGroupPrefixBytes; + while (nextKey == null && iterator.isValid()) { + try { + byte[] key = iterator.key(); + if (key.length >= basicLength) { + if (isMatchingNameSpace(key)) { + ByteArrayInputStreamWithPos inputStream = + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes); + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream); + K value = AbstractRocksDBState.AbstractRocksDBUtils.readKey( + keySerializer, + inputStream, + dataInput, + ambiguousKeyPossible); + nextKey = value; + } + } + iterator.next(); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); + } + } + return nextKey != null; } @Override public K next() { if (!hasNext()) { throw new NoSuchElementException("Failed to access state [" + state + "]"); } - try { - byte[] key = iterator.key(); - DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( - new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); - K value = keySerializer.deserialize(dataInput); - iterator.next(); - return value; - } catch (IOException e) { - throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); + + K tmpKey = nextKey; + nextKey = null; +
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169989983 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -207,6 +208,9 @@ /** Unique ID of this backend. */ private UUID backendUID; + /** The byte array for namespace serialization in getKeys(). */ + private final ByteArrayOutputStreamWithPos namespaceOutputStream; --- End diff -- I would choose to create one of there per` getKeys(...)`.. I think it is cleaner to me than put it into `RocksDBKeyedStateBackend`, cause it is only used in `getKeys()` for now. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169988602 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java --- @@ -61,8 +61,9 @@ * over it keys are not supported. * @param state State variable for which existing keys will be returned. * @param namespace Namespace for which existing keys will be returned. +* @param namespaceSerializer the serializer for the namespace. */ -Stream getKeys(String state, N namespace); +Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer); --- End diff -- Aha, I also noticed that `RegisteredKeyedBackendStateMetaInfo` contains the namespace serializer. But there is one thing that confuses me, I found that it looks like one `ColumnFamilyHandle` can correspond to multiple namespace serializer but I am not sure about this, so I introduced this additional parameter. Am I misunderstanding something? Can one `ColumnFamilyHandle` only correspond to one namespace serializer? ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169957582 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend( this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.namespaceOutputStream = new ByteArrayOutputStreamWithPos(8); LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @Override - public Stream getKeys(String state, N namespace) { + public Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer) { Tuple2columnInfo = kvStateInformation.get(state); if (columnInfo == null) { return Stream.empty(); } - RocksIterator iterator = db.newIterator(columnInfo.f0); - iterator.seekToFirst(); + RocksIterator iterator = null; + try { + iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + boolean ambiguousKeyPossible = AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); + final byte[] nameSpaceBytes; - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + namespaceOutputStream.reset(); + AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace( + namespace, + namespaceSerializer, + namespaceOutputStream, + new DataOutputViewStreamWrapper(namespaceOutputStream), + ambiguousKeyPossible); + nameSpaceBytes = namespaceOutputStream.toByteArray(); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); + } + + final RocksIteratorWrapper iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, + ambiguousKeyPossible, nameSpaceBytes); + + Stream targetStream = StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false); --- End diff -- I think `Spliterators.spliteratorUnknownSize(iteratorWrapper, Spliterator.ORDERED)` might be the cleaner way to get a stream from the iterator. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169950751 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java --- @@ -161,107 +160,131 @@ protected void writeKeyWithGroupAndNamespace( Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context."); keySerializationStream.reset(); - writeKeyGroup(keyGroup, keySerializationDataOutputView); - writeKey(key, keySerializationStream, keySerializationDataOutputView); - writeNameSpace(namespace, keySerializationStream, keySerializationDataOutputView); + AbstractRocksDBUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView); + AbstractRocksDBUtils.writeKey(key, backend.getKeySerializer(), keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); + AbstractRocksDBUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible); } - private void writeKeyGroup( - int keyGroup, - DataOutputView keySerializationDateDataOutputView) throws IOException { - for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) { - keySerializationDateDataOutputView.writeByte(keyGroup >>> (i << 3)); - } + protected Tuple3readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, DataInputView inputView) throws IOException { + int keyGroup = AbstractRocksDBUtils.readKeyGroup(backend.getKeyGroupPrefixBytes(), inputView); + K key = AbstractRocksDBUtils.readKey(backend.getKeySerializer(), inputStream, inputView, ambiguousKeyPossible); + N namespace = AbstractRocksDBUtils.readNamespace(namespaceSerializer, inputStream, inputView, ambiguousKeyPossible); + + return new Tuple3<>(keyGroup, key, namespace); } - private void writeKey( - K key, - ByteArrayOutputStreamWithPos keySerializationStream, - DataOutputView keySerializationDataOutputView) throws IOException { - //write key - int beforeWrite = keySerializationStream.getPosition(); - backend.getKeySerializer().serialize(key, keySerializationDataOutputView); - - if (ambiguousKeyPossible) { - //write size of key - writeLengthFrom(beforeWrite, keySerializationStream, - keySerializationDataOutputView); + /** +* Utils for RocksDB state serialization and deserialization. +*/ + static class AbstractRocksDBUtils { --- End diff -- The name of this class already suggests that it might better go to its own file, maybe as `RocksDBKeySerializationUtils`. Now that the methods are public and used in different places, I also suggest to have a test to guard their behavior against accidental code changes. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169949734 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +2034,87 @@ public int numStateEntries() { return count; } - private static class RocksIteratorWrapper implements Iterator { + /** +* This class is not thread safety. --- End diff -- `thread safe`, without the `ty` ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169949298 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +2034,87 @@ public int numStateEntries() { return count; } - private static class RocksIteratorWrapper implements Iterator { + /** +* This class is not thread safety. +*/ + static class RocksIteratorWrapper implements Iterator, AutoCloseable { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private final boolean ambiguousKeyPossible; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + final int basicLength = namespaceBytesLength + keyGroupPrefixBytes; + while (nextKey == null && iterator.isValid()) { + try { + byte[] key = iterator.key(); + if (key.length >= basicLength) { --- End diff -- The outer if could become part of the matcher function, or at least we can combine bot `if` statements with `&&`. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169948460 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksIterator; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the RocksIteratorWrapper. + */ +public class RocksDBRocksIteratorWrapperTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testIterator() throws Exception{ + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); + } + +void testIteratorHelper( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + int maxKeyGroupNumber, + FunctiongetKeyFunc) throws Exception { + + String testStateName = "aha"; + String namespace = "ns"; + + String dbPath = tmp.newFolder().getAbsolutePath(); + String checkpointPath = tmp.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); + backend.setDbStoragePath(dbPath); + + Environment env = new DummyEnvironment("TestTask", 1, 0); + + RocksDBKeyedStateBackend keyedStateBackend = (RocksDBKeyedStateBackend) backend.createKeyedStateBackend( + env, + new JobID(), + "Test", + keySerializer, + maxKeyGroupNumber, + new KeyGroupRange(0, maxKeyGroupNumber - 1), + mock(TaskKvStateRegistry.class)); + + keyedStateBackend.restore(null); + + ValueState testState = keyedStateBackend.getPartitionedState( + namespace, +
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169948011 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksIterator; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the RocksIteratorWrapper. + */ +public class RocksDBRocksIteratorWrapperTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testIterator() throws Exception{ + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); + } + +void testIteratorHelper( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + int maxKeyGroupNumber, + FunctiongetKeyFunc) throws Exception { + + String testStateName = "aha"; + String namespace = "ns"; + + String dbPath = tmp.newFolder().getAbsolutePath(); + String checkpointPath = tmp.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); + backend.setDbStoragePath(dbPath); + + Environment env = new DummyEnvironment("TestTask", 1, 0); + + RocksDBKeyedStateBackend keyedStateBackend = (RocksDBKeyedStateBackend) backend.createKeyedStateBackend( + env, + new JobID(), + "Test", + keySerializer, + maxKeyGroupNumber, + new KeyGroupRange(0, maxKeyGroupNumber - 1), + mock(TaskKvStateRegistry.class)); + + keyedStateBackend.restore(null); + + ValueState testState = keyedStateBackend.getPartitionedState( + namespace, +
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169947880 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksIterator; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; + +/** + * Tests for the RocksIteratorWrapper. + */ +public class RocksDBRocksIteratorWrapperTest { + + @Rule + public final TemporaryFolder tmp = new TemporaryFolder(); + + @Test + public void testIterator() throws Exception{ + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> i); + + // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 128, i -> String.valueOf(i)); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == false + testIteratorHelper(IntSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> i); + + // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == true + testIteratorHelper(StringSerializer.INSTANCE, StringSerializer.INSTANCE, 256, i -> String.valueOf(i)); + } + +void testIteratorHelper( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + int maxKeyGroupNumber, + FunctiongetKeyFunc) throws Exception { + + String testStateName = "aha"; + String namespace = "ns"; + + String dbPath = tmp.newFolder().getAbsolutePath(); + String checkpointPath = tmp.newFolder().toURI().toString(); + RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); + backend.setDbStoragePath(dbPath); + + Environment env = new DummyEnvironment("TestTask", 1, 0); + + RocksDBKeyedStateBackend keyedStateBackend = (RocksDBKeyedStateBackend) backend.createKeyedStateBackend( --- End diff -- This object must be disposed in the end or other tests could eventually fail with JVM crashes. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169946803 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend( this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.namespaceOutputStream = new ByteArrayOutputStreamWithPos(8); LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @Override - public Stream getKeys(String state, N namespace) { + public Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer) { Tuple2columnInfo = kvStateInformation.get(state); if (columnInfo == null) { return Stream.empty(); } - RocksIterator iterator = db.newIterator(columnInfo.f0); - iterator.seekToFirst(); + RocksIterator iterator = null; + try { + iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + boolean ambiguousKeyPossible = AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); + final byte[] nameSpaceBytes; - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + namespaceOutputStream.reset(); + AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace( + namespace, + namespaceSerializer, + namespaceOutputStream, + new DataOutputViewStreamWrapper(namespaceOutputStream), + ambiguousKeyPossible); + nameSpaceBytes = namespaceOutputStream.toByteArray(); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); + } + + final RocksIteratorWrapper iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, + ambiguousKeyPossible, nameSpaceBytes); + + Stream targetStream = StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false); + return targetStream.onClose(() -> { + try { + iteratorWrapper.close(); + } catch (Exception ex) { + LOG.warn("Release RocksIteratorWrapper failed.", ex); + } + }); + } catch (Exception ex) { --- End diff -- As mentioned in my previous comment, we can solve this without any `try-catch` here, just create the native iterator further down, where no more exception can happen, i.e. ``` (...) RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); final RocksIteratorWrapper iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes); Stream targetStream = StreamSupport.stream(((Iterable) () -> iteratorWrapper).spliterator(), false); return targetStream.onClose(iteratorWrapper::close); ``` ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169946359 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend( this.restoredKvStateMetaInfos = new HashMap<>(); this.materializedSstFiles = new TreeMap<>(); this.backendUID = UUID.randomUUID(); + this.namespaceOutputStream = new ByteArrayOutputStreamWithPos(8); LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); } @Override - public Stream getKeys(String state, N namespace) { + public Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer) { Tuple2columnInfo = kvStateInformation.get(state); if (columnInfo == null) { return Stream.empty(); } - RocksIterator iterator = db.newIterator(columnInfo.f0); - iterator.seekToFirst(); + RocksIterator iterator = null; + try { + iterator = db.newIterator(columnInfo.f0); + iterator.seekToFirst(); + + boolean ambiguousKeyPossible = AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, namespaceSerializer); + final byte[] nameSpaceBytes; - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + namespaceOutputStream.reset(); + AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace( + namespace, + namespaceSerializer, + namespaceOutputStream, + new DataOutputViewStreamWrapper(namespaceOutputStream), + ambiguousKeyPossible); + nameSpaceBytes = namespaceOutputStream.toByteArray(); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); + } + + final RocksIteratorWrapper iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, + ambiguousKeyPossible, nameSpaceBytes); + + Stream targetStream = StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false); + return targetStream.onClose(() -> { + try { + iteratorWrapper.close(); + } catch (Exception ex) { + LOG.warn("Release RocksIteratorWrapper failed.", ex); + } + }); + } catch (Exception ex) { --- End diff -- As mentioned in my previous comment, this `try-catch` block is not required when we create the iterator further down, where no more exceptions can happen, i.e. ``` (...) RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); final RocksIteratorWrapper iteratorWrapper = new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, ambiguousKeyPossible, nameSpaceBytes); Stream targetStream = StreamSupport.stream(((Iterable) () -> iteratorWrapper).spliterator(), false); return targetStream.onClose(iterator::close); ``` ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169931821 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +2034,87 @@ public int numStateEntries() { return count; } - private static class RocksIteratorWrapper implements Iterator { + /** +* This class is not thread safety. +*/ + static class RocksIteratorWrapper implements Iterator, AutoCloseable { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private final boolean ambiguousKeyPossible; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + boolean ambiguousKeyPossible, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; + this.ambiguousKeyPossible = ambiguousKeyPossible; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + final int basicLength = namespaceBytesLength + keyGroupPrefixBytes; + while (nextKey == null && iterator.isValid()) { + try { + byte[] key = iterator.key(); + if (key.length >= basicLength) { + if (isMatchingNameSpace(key)) { + ByteArrayInputStreamWithPos inputStream = + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes); + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream); + K value = AbstractRocksDBState.AbstractRocksDBUtils.readKey( + keySerializer, + inputStream, + dataInput, + ambiguousKeyPossible); + nextKey = value; + } + } + iterator.next(); + } catch (IOException e) { + throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); + } + } + return nextKey != null; } @Override public K next() { if (!hasNext()) { throw new NoSuchElementException("Failed to access state [" + state + "]"); } - try { - byte[] key = iterator.key(); - DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( - new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); - K value = keySerializer.deserialize(dataInput); - iterator.next(); - return value; - } catch (IOException e) { - throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); + + K tmpKey = nextKey; + nextKey = null;
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169905515 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -207,6 +208,9 @@ /** Unique ID of this backend. */ private UUID backendUID; + /** The byte array for namespace serialization in getKeys(). */ + private final ByteArrayOutputStreamWithPos namespaceOutputStream; --- End diff -- It feels like this member is in a too broad scope. While this maximizes caching, I wonder if creating one of there per `getKeys(...)` all is not cleaner and still efficient enough. What do you think? ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169904243 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java --- @@ -61,8 +61,9 @@ * over it keys are not supported. * @param state State variable for which existing keys will be returned. * @param namespace Namespace for which existing keys will be returned. +* @param namespaceSerializer the serializer for the namespace. */ -Stream getKeys(String state, N namespace); +Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer); --- End diff -- I noticed that introducing this additional parameter is actually not required. It is only used in RocksDB, where we can also get the namespace serializer in `getKeys(...)` via ``` Tuple2columnInfo = kvStateInformation.get(state); if (columnInfo == null) { return Stream.empty(); } ... columnInfo.f1.getNamespaceSerializer(); ``` ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169371872 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { + for (int i = 1; i <= namespaceBytesLength; ++i) { + if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) { + namespaceValid = false; + break; + } + } + if (namespaceValid) { + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( --- End diff -- Oh, yes sorry you are right, I was confused :-) ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r16937 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { + for (int i = 1; i <= namespaceBytesLength; ++i) { + if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) { + namespaceValid = false; + break; + } + } + if (namespaceValid) { + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( --- End diff -- Aha, sorry that I am a bit confuse about "`ByteArrayInputStreamWithPos` can also grow internally?" Do you mean `ByteArrayOutputStreamWithPos`? ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169369352 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -211,24 +211,55 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { @Test public void testGetKeys() throws Exception { - final int elementsToTest = 1000; + final int namespace1ElementsNum = 1000; + final int namespace2ElementsNum = 1000; String fieldName = "get-keys-test"; AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); try { - ValueState keyedState = backend.getOrCreateKeyedState( - VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE)); - ((InternalValueState) keyedState).setCurrentNamespace(VoidNamespace.INSTANCE); + final String ns1 = "ns1"; + ValueState keyedState1 = backend.getPartitionedState( + ns1, + StringSerializer.INSTANCE, + new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE) + ); + + ((InternalValueState ) keyedState1).setCurrentNamespace(ns1); + + for (int key = 0; key < namespace1ElementsNum; key++) { + backend.setCurrentKey(key); + keyedState1.update(key * 2); + } + + ValueState keyedState2 = backend.getPartitionedState( + ns1, --- End diff -- addressed. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169369280 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { + for (int i = 1; i <= namespaceBytesLength; ++i) { + if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) { + namespaceValid = false; + break; + } + } + if (namespaceValid) { + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); + K value = keySerializer.deserialize(dataInput); + if (dataInput.available() == namespaceBytesLength) { --- End diff -- You are right! And I am almost like to remove it now ... ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169368766 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { --- End diff -- addressed. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169368813 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { --- End diff -- addressed. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169368679 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; --- End diff -- Added a unit test `RocksDBRocksIteratorWrapperTest`. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169368440 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { --- End diff -- Good point, addressed. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169368355 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend( RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(8); + namespaceSerializer.serialize(namespace, new DataOutputViewStreamWrapper(outputStream)); + final byte[] namespaceBytes = outputStream.toByteArray(); + Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, namespaceBytes); + Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); + return targetStream.onClose(iterator::close); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); --- End diff -- Nice catch! Addressed. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169367934 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend( RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(8); --- End diff -- addressed. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169367889 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java --- @@ -62,7 +62,7 @@ * @param state State variable for which existing keys will be returned. * @param namespace Namespace for which existing keys will be returned. */ -Stream getKeys(String state, N namespace); +Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer); --- End diff -- addressed. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169065861 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -211,24 +211,55 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { @Test public void testGetKeys() throws Exception { - final int elementsToTest = 1000; + final int namespace1ElementsNum = 1000; + final int namespace2ElementsNum = 1000; String fieldName = "get-keys-test"; AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); try { - ValueState keyedState = backend.getOrCreateKeyedState( - VoidNamespaceSerializer.INSTANCE, - new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE)); - ((InternalValueState) keyedState).setCurrentNamespace(VoidNamespace.INSTANCE); + final String ns1 = "ns1"; + ValueState keyedState1 = backend.getPartitionedState( + ns1, + StringSerializer.INSTANCE, + new ValueStateDescriptor<>(fieldName, IntSerializer.INSTANCE) + ); + + ((InternalValueState ) keyedState1).setCurrentNamespace(ns1); + + for (int key = 0; key < namespace1ElementsNum; key++) { + backend.setCurrentKey(key); + keyedState1.update(key * 2); + } + + ValueState keyedState2 = backend.getPartitionedState( + ns1, --- End diff -- If you give `ns2` here you don't have to call `setCurrentNamespace()` later. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169072232 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { --- End diff -- One change that I would suggest to something that was not introduced in the PR: we could make `RocksIteratorWrapper` implement `AutoCloseable` and use that close method instead of calling close directly on the RockDB iterator where this is used. I think this is cleaner, because the wrapper should own the Rocks iterator. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169076426 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { --- End diff -- We could decompose this loop by introducing a helper method `isMatchingNameSpace(byte[] key, byte[] nameSpaceBytes)`. I think that makes the control flow easier to read. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169067403 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend( RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(8); --- End diff -- You could better use our `ByteArrayOutputStreamWithPos` which drops all the unnecessary synchronization. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169074775 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { + for (int i = 1; i <= namespaceBytesLength; ++i) { + if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) { + namespaceValid = false; + break; + } + } + if (namespaceValid) { + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( --- End diff -- This creates a lot of short-lived objects, and the `ByteArrayInputStreamWithPos` can also grow internally. I wonder if we could not just reuse always the same `ByteArrayInputStreamWithPos` at least? This could also let the internal array eventually be at good size. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169066387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java --- @@ -62,7 +62,7 @@ * @param state State variable for which existing keys will be returned. * @param namespace Namespace for which existing keys will be returned. */ -Stream getKeys(String state, N namespace); +Stream getKeys(String state, N namespace, TypeSerializer namespaceSerializer); --- End diff -- New parameter should be referenced in JavaDoc. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169072840 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; --- End diff -- This method became pretty complex now, so maybe we should have a unit test for it, similar to e.g. `RocksDBMergeIteratorTest`? In particular, it should cover corner cases and different keygroup prefix byte sizes. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169077434 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { + for (int i = 1; i <= namespaceBytesLength; ++i) { + if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) { + namespaceValid = false; + break; + } + } + if (namespaceValid) { + DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( + new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); + K value = keySerializer.deserialize(dataInput); + if (dataInput.available() == namespaceBytesLength) { --- End diff -- What is the purpose of this `if`? It looks more like a sanity check, but at the same time will silently drop data if it is triggered. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169075007 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -1991,43 +1999,71 @@ public int numStateEntries() { return count; } + /** +* This class is not thread safety. +*/ private static class RocksIteratorWrapper implements Iterator { private final RocksIterator iterator; private final String state; private final TypeSerializer keySerializer; private final int keyGroupPrefixBytes; + private final byte[] namespaceBytes; + private K nextKey; public RocksIteratorWrapper( RocksIterator iterator, String state, TypeSerializer keySerializer, - int keyGroupPrefixBytes) { + int keyGroupPrefixBytes, + byte[] namespaceBytes) { this.iterator = Preconditions.checkNotNull(iterator); this.state = Preconditions.checkNotNull(state); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); + this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes); + this.nextKey = null; } @Override public boolean hasNext() { - return iterator.isValid(); + final int namespaceBytesLength = namespaceBytes.length; + while (nextKey == null && iterator.isValid()) { + try { + boolean namespaceValid = true; + byte[] key = iterator.key(); + if (key.length >= namespaceBytesLength + keyGroupPrefixBytes) { --- End diff -- `namespaceBytesLength + keyGroupPrefixBytes` is basically a constant after the constructor. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5518#discussion_r169069584 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend( RocksIterator iterator = db.newIterator(columnInfo.f0); iterator.seekToFirst(); - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); + try { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(8); + namespaceSerializer.serialize(namespace, new DataOutputViewStreamWrapper(outputStream)); + final byte[] namespaceBytes = outputStream.toByteArray(); + Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, namespaceBytes); + Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); + return targetStream.onClose(iterator::close); + } catch (IOException ex) { + throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex); --- End diff -- This case would not call `iterator.close()`. I suggest to just create everything related to `namespaceBytes` before creating the iterator, so that the iterator is only created when no further exception should happen. ---
[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...
GitHub user sihuazhou opened a pull request: https://github.com/apache/flink/pull/5518 [FLINK-8679][State Backends]Fix RocksDBKeyedBackend.getKeys() bug for missing namespace condition. This PR addressed issue [FLINK-8679](https://issues.apache.org/jira/browse/FLINK-8679). Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It doesn't use the namespace to filter data. The lead to problematic when one key corresponding to different namespace. ## Brief change log - Modify RocksDBKeyedStateBackend.getKeys() to filter data according to namespace. ## Verifying this change - This change can be verified by unit test in StateBackendTestBase.testGetKeys(). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sihuazhou/flink fix_rocksdb_getkeys Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5518.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5518 commit ec861f44c7dd9fb136fdc682154992f115288f77 Author: sihuazhouDate: 2018-02-16T17:26:12Z Fix getKeys() in RocksDBKeyStateBackend. commit 7b0069421aa4e484aba2a97db2e4c6b3cd88f058 Author: sihuazhou Date: 2018-02-17T02:29:59Z Fix loop bug in `getKeys()`. commit d50cf63a7da42ebcf5e6ad08e3bbe28b462b1f7c Author: sihuazhou Date: 2018-02-17T14:13:14Z add test case for different namespace. ---