[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576664#comment-16576664
 ] 

ASF GitHub Bot commented on FLINK-10041:


asfgit closed pull request #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
index 33836f0c781..698a9f97dc0 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayDataInputView.java
@@ -53,4 +53,8 @@ public void setPosition(int pos) {
public void setData(@Nonnull byte[] buffer, int offset, int length) {
inStreamWithPos.setBuffer(buffer, offset, length);
}
+
+   public void setData(@Nonnull byte[] buffer) {
+   setData(buffer, 0, buffer.length);
+   }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
index 32819f84e46..2a9ab7589a9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBAppendingState.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.runtime.state.internal.InternalAppendingState;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -63,7 +61,8 @@ SV getInternal(byte[] key) {
if (valueBytes == null) {
return null;
}
-   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
+   dataInputView.setData(valueBytes);
+   return valueSerializer.deserialize(dataInputView);
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while retrieving 
data from RocksDB", e);
}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 7483089106f..65b7f1fa4a7 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -20,9 +20,8 @@
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.memory.ByteArrayDataInputView;
+import org.apache.flink.core.memory.ByteArrayDataOutputView;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
@@ -67,9 +66,9 @@
 
protected final WriteOptions writeOptions;
 
-   protected final ByteArrayOutputStreamWithPos keySerializationStream;
+   protected final ByteArrayDataOutputView dataOutputView;
 
-   protected final DataOutputView keySerializationDataOutputView;
+   protected final ByteArrayDataInputView dataInputView;
 
private final boolean ambiguousKeyPossible;
 
@@ -98,9 +97,10 @@ protected AbstractRocksDBState(
this.valueSerializer = 
Preconditions.checkNotNull(valueSerializer, "State value serializer");
this.defaultValue = defaultValue;
 
-   this.keySerializationStre

[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576660#comment-16576660
 ] 

ASF GitHub Bot commented on FLINK-10041:


StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-412162109
 
 
   @bowenli86 @azagrebin thanks for the reviews. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576601#comment-16576601
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-412147803
 
 
   👍 LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576399#comment-16576399
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209288519
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states 
into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+   private final PriorityQueue heap;
+   private final int keyGroupPrefixByteCount;
+   private boolean newKeyGroup;
+   private boolean newKVState;
+   private boolean valid;
+   private RocksSingleStateIterator currentSubIterator;
+
+   private static final List> 
COMPARATORS;
+
+   static {
+   int maxBytes = 2;
+   COMPARATORS = new ArrayList<>(maxBytes);
+   for (int i = 0; i < maxBytes; ++i) {
+   final int currentBytes = i + 1;
+   COMPARATORS.add((o1, o2) -> {
+   int arrayCmpRes = compareKeyGroupsForByteArrays(
+   o1.getCurrentKey(), o2.getCurrentKey(), 
currentBytes);
+   return arrayCmpRes == 0 ? o1.getKvStateId() - 
o2.getKvStateId() : arrayCmpRes;
+   });
+   }
+   }
+
+   public RocksStatesPerKeyGroupMergeIterator(
+   List> kvStateIterators,
+   final int keyGroupPrefixByteCount) {
+   Preconditions.checkNotNull(kvStateIterators);
+   Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+   Comparator iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+   if (kvStateIterators.size() > 0) {
+   PriorityQueue 
iteratorPriorityQueue =
+   new PriorityQueue<>(kvStateIterators.size(), 
iteratorComparator);
+
+   for (Tuple2 
rocksIteratorWithKVStateId : kvStateIterators) {
+   final RocksIteratorWrapper rocksIterator = 
rocksIteratorWithKVStateId.f0;
+   rocksIterator.seekToFirst();
+   if (rocksIterator.isValid()) {
+   iteratorPriorityQueue.offer(new 
RocksSingleStateIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
+   } else {
+   IOUtils.closeQuietly(rocksIterator);
+   }
+   }
+
+   kvStateIterators.clear();
+
+   this.heap = iteratorPriorityQueue;
+   this.valid = !heap.isEmpty();
+   this.currentSubIterator = heap.poll();
+   } else {
+   // creating a PriorityQueue of size 0 results in an 
exception.
+   this.heap = null;
+   this.valid = false;
+   }
+
+   this.newKeyGroup = true;
+   this.newKVState = true;
+   }
+
+   /**
+* Advance the iterator. Should only be called if {@l

[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576402#comment-16576402
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209283620
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param  the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator implements Iterator, AutoCloseable {
+
+   @Nonnull
+   private final RocksIteratorWrapper iterator;
+
+   @Nonnull
+   private final String state;
+
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   @Nonnull
+   private final byte[] namespaceBytes;
+
+   private final boolean ambiguousKeyPossible;
+   private final int keyGroupPrefixBytes;
+   private K nextKey;
+   private K previousKey;
+
+   public RocksStateKeysIterator(
+   @Nonnull RocksIteratorWrapper iterator,
+   @Nonnull String state,
+   @Nonnull TypeSerializer keySerializer,
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   @Nonnull byte[] namespaceBytes) {
+   this.iterator = iterator;
+   this.state = state;
+   this.keySerializer = keySerializer;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.namespaceBytes = namespaceBytes;
+   this.nextKey = null;
+   this.previousKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
+   }
+
+   @Override
+   public boolean hasNext() {
+   try {
+   while (nextKey == null && iterator.isValid()) {
+
+   byte[] key = iterator.key();
+
+   ByteArrayInputStreamWithPos inputStream =
+   new ByteArrayInputStreamWithPos(key, 
keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
+
+   DataInputViewStreamWrapper dataInput = new 
DataInputViewStreamWrapper(inputStream);
+
+   K value = RocksDBKeySerializationUtils.readKey(
 
 Review comment:
   value -> currentKey


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-

[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576400#comment-16576400
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209280247
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksSingleStateIterator.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wraps a RocksDB iterator to cache it's current key and assigns an id for 
the key/value state to the iterator.
+ * Used by {@link RocksStatesPerKeyGroupMergeIterator}.
+ */
+class RocksSingleStateIterator implements AutoCloseable {
+
+   /**
+* @param iterator  The #RocksIterator to wrap .
 
 Review comment:
   Maybe:
   `@param iterator underlying {@link RocksIteratorWrapper}`
   otherwise it is like wrapper to wrap wrapper :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576401#comment-16576401
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209290925
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksTransformingIteratorWrapper.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+
+import org.rocksdb.RocksIterator;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Wrapper around {@link RocksIteratorWrapper} that applies a given {@link 
StateSnapshotTransformer} to the elements
 
 Review comment:
   Wrapper around {@link *RocksIterator*}


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576403#comment-16576403
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209287458
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStatesPerKeyGroupMergeIterator.java
 ##
 @@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Iterator that merges multiple RocksDB iterators to partition all states 
into contiguous key-groups.
+ * The resulting iteration sequence is ordered by (key-group, kv-state).
+ */
+public class RocksStatesPerKeyGroupMergeIterator implements AutoCloseable {
+
+   private final PriorityQueue heap;
+   private final int keyGroupPrefixByteCount;
+   private boolean newKeyGroup;
+   private boolean newKVState;
+   private boolean valid;
+   private RocksSingleStateIterator currentSubIterator;
+
+   private static final List> 
COMPARATORS;
+
+   static {
+   int maxBytes = 2;
+   COMPARATORS = new ArrayList<>(maxBytes);
+   for (int i = 0; i < maxBytes; ++i) {
+   final int currentBytes = i + 1;
+   COMPARATORS.add((o1, o2) -> {
+   int arrayCmpRes = compareKeyGroupsForByteArrays(
+   o1.getCurrentKey(), o2.getCurrentKey(), 
currentBytes);
+   return arrayCmpRes == 0 ? o1.getKvStateId() - 
o2.getKvStateId() : arrayCmpRes;
+   });
+   }
+   }
+
+   public RocksStatesPerKeyGroupMergeIterator(
+   List> kvStateIterators,
+   final int keyGroupPrefixByteCount) {
+   Preconditions.checkNotNull(kvStateIterators);
+   Preconditions.checkArgument(keyGroupPrefixByteCount >= 1);
+
+   this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
+
+   Comparator iteratorComparator = 
COMPARATORS.get(keyGroupPrefixByteCount - 1);
+
+   if (kvStateIterators.size() > 0) {
+   PriorityQueue 
iteratorPriorityQueue =
 
 Review comment:
   could be also separate method like:
   `PriorityQueue fillQueue(... kvStateIterators)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576398#comment-16576398
 ] 

ASF GitHub Bot commented on FLINK-10041:


azagrebin commented on a change in pull request #6501: [FLINK-10041] Extract 
iterators from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#discussion_r209283561
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/iterator/RocksStateKeysIterator.java
 ##
 @@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.iterator;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.contrib.streaming.state.RocksDBKeySerializationUtils;
+import org.apache.flink.contrib.streaming.state.RocksIteratorWrapper;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+
+/**
+ * Adapter class to bridge between {@link RocksIteratorWrapper} and {@link 
Iterator} to iterate over the keys. This class
+ * is not thread safe.
+ *
+ * @param  the type of the iterated objects, which are keys in RocksDB.
+ */
+public class RocksStateKeysIterator implements Iterator, AutoCloseable {
+
+   @Nonnull
+   private final RocksIteratorWrapper iterator;
+
+   @Nonnull
+   private final String state;
+
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   @Nonnull
+   private final byte[] namespaceBytes;
+
+   private final boolean ambiguousKeyPossible;
+   private final int keyGroupPrefixBytes;
+   private K nextKey;
+   private K previousKey;
+
+   public RocksStateKeysIterator(
+   @Nonnull RocksIteratorWrapper iterator,
+   @Nonnull String state,
+   @Nonnull TypeSerializer keySerializer,
+   int keyGroupPrefixBytes,
+   boolean ambiguousKeyPossible,
+   @Nonnull byte[] namespaceBytes) {
+   this.iterator = iterator;
+   this.state = state;
+   this.keySerializer = keySerializer;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.namespaceBytes = namespaceBytes;
+   this.nextKey = null;
+   this.previousKey = null;
+   this.ambiguousKeyPossible = ambiguousKeyPossible;
+   }
+
+   @Override
+   public boolean hasNext() {
+   try {
+   while (nextKey == null && iterator.isValid()) {
+
+   byte[] key = iterator.key();
+
+   ByteArrayInputStreamWithPos inputStream =
 
 Review comment:
   I would suggest to move deserialisation stuff into smaller method, like:
   `Tuple2 deserKeyAndNamespacePos(byte[] keyBytes)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576070#comment-16576070
 ] 

ASF GitHub Bot commented on FLINK-10041:


StefanRRichter commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-412044338
 
 
   CC @azagrebin 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570583#comment-16570583
 ] 

ASF GitHub Bot commented on FLINK-10041:


bowenli86 commented on issue #6501: [FLINK-10041] Extract iterators from 
RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501#issuecomment-410801986
 
 
   +1, much cleaner now.  LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10041) Extract all different iterators (inner or static inner classes) into full classes

2018-08-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569933#comment-16569933
 ] 

ASF GitHub Bot commented on FLINK-10041:


StefanRRichter opened a new pull request #6501: [FLINK-10041] Extract iterators 
from RocksDBKeyedStateBackend (inner …
URL: https://github.com/apache/flink/pull/6501
 
 
   ## What is the purpose of the change
   
   `RocksDBKeyedStateBackend` has grown into a huge class with many inner 
classes. This PR is the first in a series of cleanups/refactoring to decompose 
the backend into smaller entities.
   
   ## Brief change log
   
   Refactored all iterators into their own class files.
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract all different iterators (inner or static inner classes) into full 
> classes
> -
>
> Key: FLINK-10041
> URL: https://issues.apache.org/jira/browse/FLINK-10041
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)