[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374175#comment-16374175
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5518


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374152#comment-16374152
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5518
  
Thanks a lot for your efforts @sihuazhou ! Changes look good now and I will 
merge this.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372946#comment-16372946
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169993867
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -61,8 +61,9 @@
 * over it keys are not supported.
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
+* @param namespaceSerializer the serializer for the namespace.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

Checked! It is me that misunderstand something. The `getPartitionedState 
confused me, it just reset the `namespace` but not reset the 
`namespaceSerializer` for the `previous` state. Please just ignore me... 
Addressing this.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372938#comment-16372938
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169991823
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -207,6 +208,9 @@
/** Unique ID of this backend. */
private UUID backendUID;
 
+   /** The byte array for namespace serialization in getKeys(). */
+   private final ByteArrayOutputStreamWithPos namespaceOutputStream;
--- End diff --

Ok, sounds good  


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372937#comment-16372937
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169991617
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -61,8 +61,9 @@
 * over it keys are not supported.
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
+* @param namespaceSerializer the serializer for the namespace.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

Currently, each registered state has its own column family, so that should 
be ok. What made you think that this does not hold? (asking in case you found 
something that should be fixed)


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372932#comment-16372932
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169990980
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +2034,87 @@ public int numStateEntries() {
return count;
}
 
-   private static class RocksIteratorWrapper implements Iterator {
+   /**
+* This class is not thread safety.
+*/
+   static class RocksIteratorWrapper implements Iterator, 
AutoCloseable {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private final boolean ambiguousKeyPossible;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   final int basicLength = namespaceBytesLength + 
keyGroupPrefixBytes;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   byte[] key = iterator.key();
+   if (key.length >= basicLength) {
--- End diff --

Addressing


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372934#comment-16372934
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169991080
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -161,107 +160,131 @@ protected void writeKeyWithGroupAndNamespace(
Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
 
keySerializationStream.reset();
-   writeKeyGroup(keyGroup, keySerializationDataOutputView);
-   writeKey(key, keySerializationStream, 
keySerializationDataOutputView);
-   writeNameSpace(namespace, keySerializationStream, 
keySerializationDataOutputView);
+   AbstractRocksDBUtils.writeKeyGroup(keyGroup, 
backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
+   AbstractRocksDBUtils.writeKey(key, backend.getKeySerializer(), 
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+   AbstractRocksDBUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationStream, keySerializationDataOutputView, 
ambiguousKeyPossible);
}
 
-   private void writeKeyGroup(
-   int keyGroup,
-   DataOutputView keySerializationDateDataOutputView) 
throws IOException {
-   for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
-   keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
-   }
+   protected Tuple3 
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+   int keyGroup = 
AbstractRocksDBUtils.readKeyGroup(backend.getKeyGroupPrefixBytes(), inputView);
+   K key = 
AbstractRocksDBUtils.readKey(backend.getKeySerializer(), inputStream, 
inputView, ambiguousKeyPossible);
+   N namespace = 
AbstractRocksDBUtils.readNamespace(namespaceSerializer, inputStream, inputView, 
ambiguousKeyPossible);
+
+   return new Tuple3<>(keyGroup, key, namespace);
}
 
-   private void writeKey(
-   K key,
-   ByteArrayOutputStreamWithPos keySerializationStream,
-   DataOutputView keySerializationDataOutputView) throws 
IOException {
-   //write key
-   int beforeWrite = keySerializationStream.getPosition();
-   backend.getKeySerializer().serialize(key, 
keySerializationDataOutputView);
-
-   if (ambiguousKeyPossible) {
-   //write size of key
-   writeLengthFrom(beforeWrite, keySerializationStream,
-   keySerializationDataOutputView);
+   /**
+* Utils for RocksDB state serialization and deserialization.
+*/
+   static class AbstractRocksDBUtils {
--- End diff --

Addressing


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372936#comment-16372936
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169991126
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+   this.namespaceOutputStream = new 
ByteArrayOutputStreamWithPos(8);
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
@Override
-   public  Stream getKeys(String state, N namespace) {
+   public  Stream getKeys(String state, N namespace, 
TypeSerializer namespaceSerializer) {
Tuple2 columnInfo = 
kvStateInformation.get(state);
if (columnInfo == null) {
return Stream.empty();
}
 
-   RocksIterator iterator = db.newIterator(columnInfo.f0);
-   iterator.seekToFirst();
+   RocksIterator iterator = null;
+   try {
+   iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   boolean ambiguousKeyPossible = 
AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
+   final byte[] nameSpaceBytes;
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   namespaceOutputStream.reset();
+   
AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace(
+   namespace,
+   namespaceSerializer,
+   namespaceOutputStream,
+   new 
DataOutputViewStreamWrapper(namespaceOutputStream),
+   ambiguousKeyPossible);
+   nameSpaceBytes = 
namespaceOutputStream.toByteArray();
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get 
keys from RocksDB state backend.", ex);
+   }
+
+   final RocksIteratorWrapper iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+   ambiguousKeyPossible, nameSpaceBytes);
+
+   Stream targetStream = 
StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false);
--- End diff --

 


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372933#comment-16372933
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169991026
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +2034,87 @@ public int numStateEntries() {
return count;
}
 
-   private static class RocksIteratorWrapper implements Iterator {
+   /**
+* This class is not thread safety.
--- End diff --

 


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372929#comment-16372929
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169990722
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorWrapperTest {
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testIterator() throws Exception{
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+   }
+
+void testIteratorHelper(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   int maxKeyGroupNumber,
+   Function getKeyFunc) throws Exception {
+
+   String testStateName = "aha";
+   String namespace = "ns";
+
+   String dbPath = tmp.newFolder().getAbsolutePath();
+   String checkpointPath = tmp.newFolder().toURI().toString();
+   RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
+   backend.setDbStoragePath(dbPath);
+
+   Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+   RocksDBKeyedStateBackend keyedStateBackend = 
(RocksDBKeyedStateBackend) backend.createKeyedStateBackend(
+   env,
+   new JobID(),
+   "Test",
+   keySerializer,
+   maxKeyGroupNumber,
+   new KeyGroupRange(0, maxKeyGroupNumber - 1),
+

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372928#comment-16372928
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169990649
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorWrapperTest {
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testIterator() throws Exception{
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+   }
+
+void testIteratorHelper(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   int maxKeyGroupNumber,
+   Function getKeyFunc) throws Exception {
+
+   String testStateName = "aha";
+   String namespace = "ns";
+
+   String dbPath = tmp.newFolder().getAbsolutePath();
+   String checkpointPath = tmp.newFolder().toURI().toString();
+   RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
+   backend.setDbStoragePath(dbPath);
+
+   Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+   RocksDBKeyedStateBackend keyedStateBackend = 
(RocksDBKeyedStateBackend) backend.createKeyedStateBackend(
--- End diff --

Nice catch! Addressing


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: 

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372930#comment-16372930
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169990779
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorWrapperTest {
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testIterator() throws Exception{
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+   }
+
+void testIteratorHelper(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   int maxKeyGroupNumber,
+   Function getKeyFunc) throws Exception {
+
+   String testStateName = "aha";
+   String namespace = "ns";
+
+   String dbPath = tmp.newFolder().getAbsolutePath();
+   String checkpointPath = tmp.newFolder().toURI().toString();
+   RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
+   backend.setDbStoragePath(dbPath);
+
+   Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+   RocksDBKeyedStateBackend keyedStateBackend = 
(RocksDBKeyedStateBackend) backend.createKeyedStateBackend(
+   env,
+   new JobID(),
+   "Test",
+   keySerializer,
+   maxKeyGroupNumber,
+   new KeyGroupRange(0, maxKeyGroupNumber - 1),
+

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372927#comment-16372927
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169990518
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+   this.namespaceOutputStream = new 
ByteArrayOutputStreamWithPos(8);
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
@Override
-   public  Stream getKeys(String state, N namespace) {
+   public  Stream getKeys(String state, N namespace, 
TypeSerializer namespaceSerializer) {
Tuple2 columnInfo = 
kvStateInformation.get(state);
if (columnInfo == null) {
return Stream.empty();
}
 
-   RocksIterator iterator = db.newIterator(columnInfo.f0);
-   iterator.seekToFirst();
+   RocksIterator iterator = null;
+   try {
+   iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   boolean ambiguousKeyPossible = 
AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
+   final byte[] nameSpaceBytes;
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   namespaceOutputStream.reset();
+   
AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace(
+   namespace,
+   namespaceSerializer,
+   namespaceOutputStream,
+   new 
DataOutputViewStreamWrapper(namespaceOutputStream),
+   ambiguousKeyPossible);
+   nameSpaceBytes = 
namespaceOutputStream.toByteArray();
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get 
keys from RocksDB state backend.", ex);
+   }
+
+   final RocksIteratorWrapper iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+   ambiguousKeyPossible, nameSpaceBytes);
+
+   Stream targetStream = 
StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false);
+   return targetStream.onClose(() -> {
+   try {
+   iteratorWrapper.close();
+   } catch (Exception ex) {
+   LOG.warn("Release RocksIteratorWrapper 
failed.", ex);
+   }
+   });
+   }  catch (Exception ex) {
--- End diff --

 I think I misunderstand your thought before... addressing


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372925#comment-16372925
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169990041
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +2034,87 @@ public int numStateEntries() {
return count;
}
 
-   private static class RocksIteratorWrapper implements Iterator {
+   /**
+* This class is not thread safety.
+*/
+   static class RocksIteratorWrapper implements Iterator, 
AutoCloseable {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private final boolean ambiguousKeyPossible;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   final int basicLength = namespaceBytesLength + 
keyGroupPrefixBytes;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   byte[] key = iterator.key();
+   if (key.length >= basicLength) {
+   if (isMatchingNameSpace(key)) {
+   
ByteArrayInputStreamWithPos inputStream =
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes);
+   
DataInputViewStreamWrapper dataInput = new 
DataInputViewStreamWrapper(inputStream);
+   K value = 
AbstractRocksDBState.AbstractRocksDBUtils.readKey(
+   keySerializer,
+   inputStream,
+   dataInput,
+   
ambiguousKeyPossible);
+   nextKey = value;
+   }
+   }
+   iterator.next();
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Failed 
to access state [" + state + "]", e);
+   }
+   }
+   return nextKey != null;
}
 
@Override
public K next() {
if (!hasNext()) {
throw new NoSuchElementException("Failed to 
access state [" + state + "]");
}
-   try {
-   byte[] key = iterator.key();
-   DataInputViewStreamWrapper dataInput = 
new DataInputViewStreamWrapper(
-   new ByteArrayInputStreamWithPos(key, 
keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
-   K value = keySerializer.deserialize(dataInput);
-   iterator.next();
-   return value;
-  

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372923#comment-16372923
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169989983
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -207,6 +208,9 @@
/** Unique ID of this backend. */
private UUID backendUID;
 
+   /** The byte array for namespace serialization in getKeys(). */
+   private final ByteArrayOutputStreamWithPos namespaceOutputStream;
--- End diff --

I would choose to create one of there per` getKeys(...)`.. I think it is 
cleaner to me than put it into `RocksDBKeyedStateBackend`, cause it is only 
used in `getKeys()` for now. 


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372921#comment-16372921
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169988602
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -61,8 +61,9 @@
 * over it keys are not supported.
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
+* @param namespaceSerializer the serializer for the namespace.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

Aha, I also noticed that `RegisteredKeyedBackendStateMetaInfo` contains the 
namespace serializer. But there is one thing that confuses me, I found that it 
looks like one `ColumnFamilyHandle` can correspond to multiple namespace 
serializer but I am not sure about this, so I introduced this additional 
parameter. Am I misunderstanding something? Can one `ColumnFamilyHandle` only 
correspond to one namespace serializer?


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372816#comment-16372816
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169957582
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+   this.namespaceOutputStream = new 
ByteArrayOutputStreamWithPos(8);
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
@Override
-   public  Stream getKeys(String state, N namespace) {
+   public  Stream getKeys(String state, N namespace, 
TypeSerializer namespaceSerializer) {
Tuple2 columnInfo = 
kvStateInformation.get(state);
if (columnInfo == null) {
return Stream.empty();
}
 
-   RocksIterator iterator = db.newIterator(columnInfo.f0);
-   iterator.seekToFirst();
+   RocksIterator iterator = null;
+   try {
+   iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   boolean ambiguousKeyPossible = 
AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
+   final byte[] nameSpaceBytes;
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   namespaceOutputStream.reset();
+   
AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace(
+   namespace,
+   namespaceSerializer,
+   namespaceOutputStream,
+   new 
DataOutputViewStreamWrapper(namespaceOutputStream),
+   ambiguousKeyPossible);
+   nameSpaceBytes = 
namespaceOutputStream.toByteArray();
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get 
keys from RocksDB state backend.", ex);
+   }
+
+   final RocksIteratorWrapper iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+   ambiguousKeyPossible, nameSpaceBytes);
+
+   Stream targetStream = 
StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false);
--- End diff --

I think `Spliterators.spliteratorUnknownSize(iteratorWrapper, 
Spliterator.ORDERED)` might be the cleaner way to get a stream from the 
iterator.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372790#comment-16372790
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169950751
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
@@ -161,107 +160,131 @@ protected void writeKeyWithGroupAndNamespace(
Preconditions.checkNotNull(key, "No key set. This method should 
not be called outside of a keyed context.");
 
keySerializationStream.reset();
-   writeKeyGroup(keyGroup, keySerializationDataOutputView);
-   writeKey(key, keySerializationStream, 
keySerializationDataOutputView);
-   writeNameSpace(namespace, keySerializationStream, 
keySerializationDataOutputView);
+   AbstractRocksDBUtils.writeKeyGroup(keyGroup, 
backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
+   AbstractRocksDBUtils.writeKey(key, backend.getKeySerializer(), 
keySerializationStream, keySerializationDataOutputView, ambiguousKeyPossible);
+   AbstractRocksDBUtils.writeNameSpace(namespace, 
namespaceSerializer, keySerializationStream, keySerializationDataOutputView, 
ambiguousKeyPossible);
}
 
-   private void writeKeyGroup(
-   int keyGroup,
-   DataOutputView keySerializationDateDataOutputView) 
throws IOException {
-   for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
-   keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
-   }
+   protected Tuple3 
readKeyWithGroupAndNamespace(ByteArrayInputStreamWithPos inputStream, 
DataInputView inputView) throws IOException {
+   int keyGroup = 
AbstractRocksDBUtils.readKeyGroup(backend.getKeyGroupPrefixBytes(), inputView);
+   K key = 
AbstractRocksDBUtils.readKey(backend.getKeySerializer(), inputStream, 
inputView, ambiguousKeyPossible);
+   N namespace = 
AbstractRocksDBUtils.readNamespace(namespaceSerializer, inputStream, inputView, 
ambiguousKeyPossible);
+
+   return new Tuple3<>(keyGroup, key, namespace);
}
 
-   private void writeKey(
-   K key,
-   ByteArrayOutputStreamWithPos keySerializationStream,
-   DataOutputView keySerializationDataOutputView) throws 
IOException {
-   //write key
-   int beforeWrite = keySerializationStream.getPosition();
-   backend.getKeySerializer().serialize(key, 
keySerializationDataOutputView);
-
-   if (ambiguousKeyPossible) {
-   //write size of key
-   writeLengthFrom(beforeWrite, keySerializationStream,
-   keySerializationDataOutputView);
+   /**
+* Utils for RocksDB state serialization and deserialization.
+*/
+   static class AbstractRocksDBUtils {
--- End diff --

The name of this class already suggests that it might better go to its own 
file, maybe as `RocksDBKeySerializationUtils`. Now that the methods are public 
and used in different places, I also suggest to have a test to guard their 
behavior against accidental code changes.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372788#comment-16372788
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169949734
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +2034,87 @@ public int numStateEntries() {
return count;
}
 
-   private static class RocksIteratorWrapper implements Iterator {
+   /**
+* This class is not thread safety.
--- End diff --

`thread safe`, without the `ty`


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372785#comment-16372785
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169949298
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +2034,87 @@ public int numStateEntries() {
return count;
}
 
-   private static class RocksIteratorWrapper implements Iterator {
+   /**
+* This class is not thread safety.
+*/
+   static class RocksIteratorWrapper implements Iterator, 
AutoCloseable {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private final boolean ambiguousKeyPossible;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   final int basicLength = namespaceBytesLength + 
keyGroupPrefixBytes;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   byte[] key = iterator.key();
+   if (key.length >= basicLength) {
--- End diff --

The outer if could become part of the matcher function, or at least we can 
combine bot `if` statements with `&&`.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372783#comment-16372783
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169948460
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorWrapperTest {
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testIterator() throws Exception{
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+   }
+
+void testIteratorHelper(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   int maxKeyGroupNumber,
+   Function getKeyFunc) throws Exception {
+
+   String testStateName = "aha";
+   String namespace = "ns";
+
+   String dbPath = tmp.newFolder().getAbsolutePath();
+   String checkpointPath = tmp.newFolder().toURI().toString();
+   RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
+   backend.setDbStoragePath(dbPath);
+
+   Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+   RocksDBKeyedStateBackend keyedStateBackend = 
(RocksDBKeyedStateBackend) backend.createKeyedStateBackend(
+   env,
+   new JobID(),
+   "Test",
+   keySerializer,
+   maxKeyGroupNumber,
+   new KeyGroupRange(0, maxKeyGroupNumber - 1),
+   

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372780#comment-16372780
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169948011
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorWrapperTest {
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testIterator() throws Exception{
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+   }
+
+void testIteratorHelper(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   int maxKeyGroupNumber,
+   Function getKeyFunc) throws Exception {
+
+   String testStateName = "aha";
+   String namespace = "ns";
+
+   String dbPath = tmp.newFolder().getAbsolutePath();
+   String checkpointPath = tmp.newFolder().toURI().toString();
+   RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
+   backend.setDbStoragePath(dbPath);
+
+   Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+   RocksDBKeyedStateBackend keyedStateBackend = 
(RocksDBKeyedStateBackend) backend.createKeyedStateBackend(
+   env,
+   new JobID(),
+   "Test",
+   keySerializer,
+   maxKeyGroupNumber,
+   new KeyGroupRange(0, maxKeyGroupNumber - 1),
+   

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372778#comment-16372778
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169947880
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBRocksIteratorWrapperTest.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksIterator;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the RocksIteratorWrapper.
+ */
+public class RocksDBRocksIteratorWrapperTest {
+
+   @Rule
+   public final TemporaryFolder tmp = new TemporaryFolder();
+
+   @Test
+   public void testIterator() throws Exception{
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> i);
+
+   // test for keyGroupPrefixBytes == 1 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 128, i -> String.valueOf(i));
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
false
+   testIteratorHelper(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> i);
+
+   // test for keyGroupPrefixBytes == 2 && ambiguousKeyPossible == 
true
+   testIteratorHelper(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, 256, i -> String.valueOf(i));
+   }
+
+void testIteratorHelper(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   int maxKeyGroupNumber,
+   Function getKeyFunc) throws Exception {
+
+   String testStateName = "aha";
+   String namespace = "ns";
+
+   String dbPath = tmp.newFolder().getAbsolutePath();
+   String checkpointPath = tmp.newFolder().toURI().toString();
+   RocksDBStateBackend backend = new RocksDBStateBackend(new 
FsStateBackend(checkpointPath), true);
+   backend.setDbStoragePath(dbPath);
+
+   Environment env = new DummyEnvironment("TestTask", 1, 0);
+
+   RocksDBKeyedStateBackend keyedStateBackend = 
(RocksDBKeyedStateBackend) backend.createKeyedStateBackend(
--- End diff --

This object must be disposed in the end or other tests could eventually 
fail with JVM crashes.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372771#comment-16372771
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169946803
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+   this.namespaceOutputStream = new 
ByteArrayOutputStreamWithPos(8);
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
@Override
-   public  Stream getKeys(String state, N namespace) {
+   public  Stream getKeys(String state, N namespace, 
TypeSerializer namespaceSerializer) {
Tuple2 columnInfo = 
kvStateInformation.get(state);
if (columnInfo == null) {
return Stream.empty();
}
 
-   RocksIterator iterator = db.newIterator(columnInfo.f0);
-   iterator.seekToFirst();
+   RocksIterator iterator = null;
+   try {
+   iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   boolean ambiguousKeyPossible = 
AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
+   final byte[] nameSpaceBytes;
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   namespaceOutputStream.reset();
+   
AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace(
+   namespace,
+   namespaceSerializer,
+   namespaceOutputStream,
+   new 
DataOutputViewStreamWrapper(namespaceOutputStream),
+   ambiguousKeyPossible);
+   nameSpaceBytes = 
namespaceOutputStream.toByteArray();
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get 
keys from RocksDB state backend.", ex);
+   }
+
+   final RocksIteratorWrapper iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+   ambiguousKeyPossible, nameSpaceBytes);
+
+   Stream targetStream = 
StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false);
+   return targetStream.onClose(() -> {
+   try {
+   iteratorWrapper.close();
+   } catch (Exception ex) {
+   LOG.warn("Release RocksIteratorWrapper 
failed.", ex);
+   }
+   });
+   }  catch (Exception ex) {
--- End diff --

As mentioned in my previous comment, we can solve this without any 
`try-catch` here, just create the native iterator further down, where no more 
exception can happen, i.e.
```
(...)
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();

final RocksIteratorWrapper iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);

Stream targetStream = StreamSupport.stream(((Iterable) () 
-> iteratorWrapper).spliterator(), false);
return targetStream.onClose(iteratorWrapper::close);
```


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
> 

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372768#comment-16372768
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169946359
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -253,22 +257,61 @@ public RocksDBKeyedStateBackend(
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
+   this.namespaceOutputStream = new 
ByteArrayOutputStreamWithPos(8);
LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
}
 
@Override
-   public  Stream getKeys(String state, N namespace) {
+   public  Stream getKeys(String state, N namespace, 
TypeSerializer namespaceSerializer) {
Tuple2 columnInfo = 
kvStateInformation.get(state);
if (columnInfo == null) {
return Stream.empty();
}
 
-   RocksIterator iterator = db.newIterator(columnInfo.f0);
-   iterator.seekToFirst();
+   RocksIterator iterator = null;
+   try {
+   iterator = db.newIterator(columnInfo.f0);
+   iterator.seekToFirst();
+
+   boolean ambiguousKeyPossible = 
AbstractRocksDBState.AbstractRocksDBUtils.isAmbiguousKeyPossible(keySerializer, 
namespaceSerializer);
+   final byte[] nameSpaceBytes;
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   namespaceOutputStream.reset();
+   
AbstractRocksDBState.AbstractRocksDBUtils.writeNameSpace(
+   namespace,
+   namespaceSerializer,
+   namespaceOutputStream,
+   new 
DataOutputViewStreamWrapper(namespaceOutputStream),
+   ambiguousKeyPossible);
+   nameSpaceBytes = 
namespaceOutputStream.toByteArray();
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get 
keys from RocksDB state backend.", ex);
+   }
+
+   final RocksIteratorWrapper iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
+   ambiguousKeyPossible, nameSpaceBytes);
+
+   Stream targetStream = 
StreamSupport.stream(((Iterable)()->iteratorWrapper).spliterator(), false);
+   return targetStream.onClose(() -> {
+   try {
+   iteratorWrapper.close();
+   } catch (Exception ex) {
+   LOG.warn("Release RocksIteratorWrapper 
failed.", ex);
+   }
+   });
+   }  catch (Exception ex) {
--- End diff --

As mentioned in my previous comment, this `try-catch` block is not required 
when we create the iterator further down, where no more exceptions can happen, 
i.e.
```
(...)
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();

final RocksIteratorWrapper iteratorWrapper = new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes,
ambiguousKeyPossible, nameSpaceBytes);

Stream targetStream = StreamSupport.stream(((Iterable) () 
-> iteratorWrapper).spliterator(), false);
return targetStream.onClose(iterator::close);
```


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: 

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372711#comment-16372711
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169931821
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +2034,87 @@ public int numStateEntries() {
return count;
}
 
-   private static class RocksIteratorWrapper implements Iterator {
+   /**
+* This class is not thread safety.
+*/
+   static class RocksIteratorWrapper implements Iterator, 
AutoCloseable {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private final boolean ambiguousKeyPossible;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   final int basicLength = namespaceBytesLength + 
keyGroupPrefixBytes;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   byte[] key = iterator.key();
+   if (key.length >= basicLength) {
+   if (isMatchingNameSpace(key)) {
+   
ByteArrayInputStreamWithPos inputStream =
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes);
+   
DataInputViewStreamWrapper dataInput = new 
DataInputViewStreamWrapper(inputStream);
+   K value = 
AbstractRocksDBState.AbstractRocksDBUtils.readKey(
+   keySerializer,
+   inputStream,
+   dataInput,
+   
ambiguousKeyPossible);
+   nextKey = value;
+   }
+   }
+   iterator.next();
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Failed 
to access state [" + state + "]", e);
+   }
+   }
+   return nextKey != null;
}
 
@Override
public K next() {
if (!hasNext()) {
throw new NoSuchElementException("Failed to 
access state [" + state + "]");
}
-   try {
-   byte[] key = iterator.key();
-   DataInputViewStreamWrapper dataInput = 
new DataInputViewStreamWrapper(
-   new ByteArrayInputStreamWithPos(key, 
keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
-   K value = keySerializer.deserialize(dataInput);
-   iterator.next();
-   return value;
- 

[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372613#comment-16372613
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169905515
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -207,6 +208,9 @@
/** Unique ID of this backend. */
private UUID backendUID;
 
+   /** The byte array for namespace serialization in getKeys(). */
+   private final ByteArrayOutputStreamWithPos namespaceOutputStream;
--- End diff --

It feels like this member is in a too broad scope. While this maximizes 
caching, I wonder if creating one of there per `getKeys(...)` all is not 
cleaner and still efficient enough. What do you think?


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16372612#comment-16372612
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169904243
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -61,8 +61,9 @@
 * over it keys are not supported.
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
+* @param namespaceSerializer the serializer for the namespace.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

I noticed that introducing this additional parameter is actually not 
required. It is only used in RocksDB, where we can also get the namespace 
serializer in `getKeys(...)` via
```
Tuple2 columnInfo = 
kvStateInformation.get(state);
if (columnInfo == null) {
return Stream.empty();
}
...
columnInfo.f1.getNamespaceSerializer();
```


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371366#comment-16371366
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5518
  
I have addressed all the comments, could you please have a look again? cc 
@StefanRRichter @aljoscha 


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370209#comment-16370209
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169371872
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
--- End diff --

Oh, yes sorry you are right, I was confused :-)


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370206#comment-16370206
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r16937
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
--- End diff --

Aha, sorry that I am a bit confuse about "`ByteArrayInputStreamWithPos` can 
also grow internally?" Do you mean `ByteArrayOutputStreamWithPos`?


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370195#comment-16370195
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169369280
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes));
+   K value = 
keySerializer.deserialize(dataInput);
+   if 
(dataInput.available() == namespaceBytesLength) {
--- End diff --

You are right! And I am almost like to remove it now ...


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370197#comment-16370197
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169369352
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -211,24 +211,55 @@ protected CheckpointStreamFactory 
createStreamFactory() throws Exception {
 
@Test
public void testGetKeys() throws Exception {
-   final int elementsToTest = 1000;
+   final int namespace1ElementsNum = 1000;
+   final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
try {
-   ValueState keyedState = 
backend.getOrCreateKeyedState(
-   VoidNamespaceSerializer.INSTANCE,
-   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE));
-   ((InternalValueState) 
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+   final String ns1 = "ns1";
+   ValueState keyedState1 = 
backend.getPartitionedState(
+   ns1,
+   StringSerializer.INSTANCE,
+   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE)
+   );
+
+   ((InternalValueState) 
keyedState1).setCurrentNamespace(ns1);
+
+   for (int key = 0; key < namespace1ElementsNum; key++) {
+   backend.setCurrentKey(key);
+   keyedState1.update(key * 2);
+   }
+
+   ValueState keyedState2 = 
backend.getPartitionedState(
+   ns1,
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370192#comment-16370192
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368766
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370191#comment-16370191
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368679
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
--- End diff --

Added a unit test `RocksDBRocksIteratorWrapperTest`.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370193#comment-16370193
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368813
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370190#comment-16370190
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368440
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
--- End diff --

Good point, addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370189#comment-16370189
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368355
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
+   namespaceSerializer.serialize(namespace, new 
DataOutputViewStreamWrapper(outputStream));
+   final byte[] namespaceBytes = 
outputStream.toByteArray();
+   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, 
namespaceBytes);
+   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
+   return targetStream.onClose(iterator::close);
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
--- End diff --

Nice catch! Addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370184#comment-16370184
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169367934
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370183#comment-16370183
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169367889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -62,7 +62,7 @@
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369171#comment-16369171
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169065861
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -211,24 +211,55 @@ protected CheckpointStreamFactory 
createStreamFactory() throws Exception {
 
@Test
public void testGetKeys() throws Exception {
-   final int elementsToTest = 1000;
+   final int namespace1ElementsNum = 1000;
+   final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
try {
-   ValueState keyedState = 
backend.getOrCreateKeyedState(
-   VoidNamespaceSerializer.INSTANCE,
-   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE));
-   ((InternalValueState) 
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+   final String ns1 = "ns1";
+   ValueState keyedState1 = 
backend.getPartitionedState(
+   ns1,
+   StringSerializer.INSTANCE,
+   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE)
+   );
+
+   ((InternalValueState) 
keyedState1).setCurrentNamespace(ns1);
+
+   for (int key = 0; key < namespace1ElementsNum; key++) {
+   backend.setCurrentKey(key);
+   keyedState1.update(key * 2);
+   }
+
+   ValueState keyedState2 = 
backend.getPartitionedState(
+   ns1,
--- End diff --

If you give `ns2` here you don't have to call `setCurrentNamespace()` later.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369129#comment-16369129
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169074775
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
--- End diff --

This creates a lot of short-lived objects, and the 
`ByteArrayInputStreamWithPos` can also grow internally. I wonder if we could 
not just reuse always the same `ByteArrayInputStreamWithPos` at least? This 
could also let the internal array eventually be at good size. 


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369134#comment-16369134
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169072232
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
--- End diff --

One change that I would suggest to something that was not introduced in the 
PR: we could make `RocksIteratorWrapper` implement `AutoCloseable` and use that 
close method instead of calling close directly on the RockDB iterator where 
this is used. I think this is cleaner, because the wrapper should own the Rocks 
iterator.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369131#comment-16369131
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169075007
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

`namespaceBytesLength + keyGroupPrefixBytes` is basically a constant after 
the constructor.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369133#comment-16369133
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169076426
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

We could decompose this loop by introducing a helper method 
`isMatchingNameSpace(byte[] key, byte[] nameSpaceBytes)`. I think that makes 
the control flow easier to read.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369135#comment-16369135
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169069584
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
+   namespaceSerializer.serialize(namespace, new 
DataOutputViewStreamWrapper(outputStream));
+   final byte[] namespaceBytes = 
outputStream.toByteArray();
+   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, 
namespaceBytes);
+   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
+   return targetStream.onClose(iterator::close);
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
--- End diff --

This case would not call `iterator.close()`. I suggest to just create 
everything related to `namespaceBytes` before creating the iterator, so that 
the iterator is only created when no further exception should happen.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369130#comment-16369130
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169072840
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
--- End diff --

This method became pretty complex now, so maybe we should have a unit test 
for it, similar to e.g. `RocksDBMergeIteratorTest`? In particular, it should 
cover corner cases and different keygroup prefix byte sizes.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369127#comment-16369127
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169066387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -62,7 +62,7 @@
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

New parameter should be referenced in JavaDoc.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369128#comment-16369128
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169067403
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
--- End diff --

You could better use our `ByteArrayOutputStreamWithPos` which drops all the 
unnecessary synchronization.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16369132#comment-16369132
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169077434
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes));
+   K value = 
keySerializer.deserialize(dataInput);
+   if 
(dataInput.available() == namespaceBytesLength) {
--- End diff --

What is the purpose of this `if`? It looks more like a sanity check, but at 
the same time will silently drop data if it is triggered.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368217#comment-16368217
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5518
  
@pnowojski Could you please have a look at this?


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16368216#comment-16368216
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5518

[FLINK-8679][State Backends]Fix RocksDBKeyedBackend.getKeys() bug for 
missing namespace condition.

This PR addressed issue 
[FLINK-8679](https://issues.apache.org/jira/browse/FLINK-8679). Currently, 
`RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It doesn't use the 
namespace to filter data. The lead to problematic when one key corresponding to 
different namespace. 

## Brief change log

- Modify RocksDBKeyedStateBackend.getKeys() to filter data according to 
namespace.

## Verifying this change

- This change can be verified by unit test in 
StateBackendTestBase.testGetKeys().

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink fix_rocksdb_getkeys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5518


commit ec861f44c7dd9fb136fdc682154992f115288f77
Author: sihuazhou 
Date:   2018-02-16T17:26:12Z

Fix getKeys() in RocksDBKeyStateBackend.

commit 7b0069421aa4e484aba2a97db2e4c6b3cd88f058
Author: sihuazhou 
Date:   2018-02-17T02:29:59Z

Fix loop bug in `getKeys()`.

commit d50cf63a7da42ebcf5e6ad08e3bbe28b462b1f7c
Author: sihuazhou 
Date:   2018-02-17T14:13:14Z

add test case for different namespace.




> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)