[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-14 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334581046
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link SkipListKeySerializer}.
+ */
+public class SkipListSerializerTest extends TestLogger {
+
+   @Test
+   public void testSkipListKeySerializer() throws IOException {
+   TypeSerializer keySerializer = 
StringSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
StringSerializer.INSTANCE;
+
+   SkipListKeySerializer skipListKeySerializer =
+   new SkipListKeySerializer<>(keySerializer, 
namespaceSerializer);
+
+   for (int i = 0; i < 10; i++) {
+   String key = "key-abcdedg" + i;
+   String namespace = "namespace-dfsfdafd" + i;
+
+   byte[] skipListKey = 
skipListKeySerializer.serialize(key, namespace);
+   int offset = 10;
+   byte[] data = new byte[10 + skipListKey.length];
+   System.arraycopy(skipListKey, 0, data, offset, 
skipListKey.length);
+   ByteBuffer skipListKeyByteBuffer = 
ByteBuffer.wrap(data);
+   assertEquals(key, 
skipListKeySerializer.deserializeKey(skipListKeyByteBuffer, offset, 
skipListKey.length));
+   assertEquals(namespace, 
skipListKeySerializer.deserializeNamespace(skipListKeyByteBuffer, offset, 
skipListKey.length));
+
+   Tuple2 serializedKeyAndNamespace =
+   
skipListKeySerializer.getSerializedKeyAndNamespace(skipListKeyByteBuffer, 
offset);
+   assertEquals(key, deserialize(keySerializer, 
serializedKeyAndNamespace.f0));
+   assertEquals(namespace, 
deserialize(namespaceSerializer, serializedKeyAndNamespace.f1));
+
+   byte[] serializedNamespace = 
skipListKeySerializer.serializeNamespace(namespace);
+   assertEquals(namespace, 
deserialize(namespaceSerializer, serializedNamespace));
+   }
+   }
+
+   @Test
+   public void testSkipListValueSerializer() throws IOException {
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   SkipListValueSerializer skipListValueSerializer =
+   new SkipListValueSerializer<>(stateSerializer);
+
+   for (int i = 0; i < 10; i++) {
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-14 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334581016
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java
 ##
 @@ -0,0 +1,94 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link SkipListKeySerializer}.
+ */
+public class SkipListSerializerTest extends TestLogger {
+
+   @Test
+   public void testSkipListKeySerializer() throws IOException {
+   TypeSerializer keySerializer = 
StringSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
StringSerializer.INSTANCE;
+
+   SkipListKeySerializer skipListKeySerializer =
+   new SkipListKeySerializer<>(keySerializer, 
namespaceSerializer);
+
+   for (int i = 0; i < 10; i++) {
 
 Review comment:
   Yes, will refactor the test to better reflect this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-14 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334180670
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
+   }
 
-   // validates space allocation. Each pair need 2 spaces
-   assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-   verifyState(referenceStates, stateMap);
+   private void testRemoveState(boolean removeAbsent, boolean getOld) {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> {
+   if (removeAbsent) {
+   

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-13 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334274093
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
 
 Review comment:
   I don't quite agree here. I think for the boolean parameters, it's normal to 
get their meaning through javadoc or parameter name in method signature.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-13 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334180018
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
 
 Review comment:
   Let me try to improve... It seems sometimes we suggest to use functions to 
reduce duplicated codes and the other cases we are against it. Hopefully I 
could get better known about the standard/convention.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-12 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334227853
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java
 ##
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.io.Closeable;
+
+/**
+ * Implementations are responsible for allocate space.
+ */
+public interface Allocator extends Closeable {
+
+   /**
+* Allocate space with the given size.
+*
+* @param size size of space to allocate.
+* @return address of the allocated space, or -1 when allocation is 
failed.
+*/
+   long allocate(int size);
 
 Review comment:
   ok, got the question, and yes we should better handle this part, will 
resolve this in new commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334193967
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -590,10 +542,9 @@ private int compareByteBufferAndNode(ByteBuffer 
keyByteBuffer, int keyOffset, in
 * equal to, or greater than the second.
 */
private int compareNamespaceAndNode(ByteBuffer namespaceByteBuffer, int 
namespaceOffset, int namespaceLen, long targetNode) {
-   Chunk chunk = 
spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(targetNode));
-   int offsetInChunk = 
SpaceUtils.getChunkOffsetByAddress(targetNode);
-   ByteBuffer targetKeyByteBuffer = 
chunk.getByteBuffer(offsetInChunk);
-   int offsetInByteBuffer = 
chunk.getOffsetInByteBuffer(offsetInChunk);
+   Tuple2 tuple2 = 
getNodeByteBufferAndOffset(targetNode);
+   ByteBuffer targetKeyByteBuffer = tuple2.f0;
+   int offsetInByteBuffer = tuple2.f1;
 
 Review comment:
   I think the class is just for improving the readability and I really don't 
want to pass the object everywhere and keep it alive for so long.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334193401
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -1136,10 +1088,9 @@ S helpGetState(long valuePointer, 
SkipListValueSerializer serializer) {
return null;
}
 
-   Chunk chunk = 
spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(valuePointer));
-   int offsetInChunk = 
SpaceUtils.getChunkOffsetByAddress(valuePointer);
-   ByteBuffer bb = chunk.getByteBuffer(offsetInChunk);
-   int offsetInByteBuffer = 
chunk.getOffsetInByteBuffer(offsetInChunk);
+   Tuple2 tuple2 = 
getNodeByteBufferAndOffset(valuePointer);
+   ByteBuffer bb = tuple2.f0;
+   int offsetInByteBuffer = tuple2.f1;
 
 Review comment:
   I think the class is just for improving the readability and I really don't 
want to pass the object everywhere and keep it alive for so long.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334190093
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, byte[] value, boo
 * @return the old state. Null will be returned if key does not exist 
or returnOldState is false.
 */
private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, boolean returnOldState) {
+   Tuple4 result = 
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+   (tuple3, isRemoved) -> {
+   long prevNode = tuple3.f0;
+   long currentNode = tuple3.f1;
+   long nextNode = tuple3.f2;
+   // if the node has been logically removed, and 
can not be physically
+   // removed here, just return null
+   if (isRemoved && highestRequiredSnapshotVersion 
!= 0) {
+   return null;
+   }
+
+   long oldValuePointer;
+   boolean oldValueNeedFree;
+
+   if (highestRequiredSnapshotVersion == 0) {
+   // do physically remove only when there 
is no snapshot running
+   oldValuePointer = 
doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode);
+   // the node has been logically removed, 
and remove it from the set
+   if (isRemoved) {
+   
logicallyRemovedNodes.remove(currentNode);
+   }
+   oldValueNeedFree = true;
+   } else {
+   int version = 
SkipListUtils.helpGetNodeLatestVersion(currentNode, spaceAllocator);
+   if (version < 
highestRequiredSnapshotVersion) {
+   // the newest-version value may 
be used by snapshots, and update it with copy-on-write
+   oldValuePointer = 
updateValueWithCopyOnWrite(currentNode, null);
+   oldValueNeedFree = false;
+   } else {
+   // replace the newest-version 
value.
+   oldValuePointer = 
updateValueWithReplace(currentNode, null);
+   oldValueNeedFree = true;
+   }
+
+   helpSetNodeStatus(currentNode, 
NodeStatus.REMOVE);
+   logicallyRemovedNodes.add(currentNode);
+   }
+
+   S oldState = null;
+   if (returnOldState) {
+   oldState = 
helpGetState(oldValuePointer);
+   }
+
+   if (oldValueNeedFree) {
+   spaceAllocator.free(oldValuePointer);
+   }
+
+   return oldState;
+   });
+   return result.f2 ? result.f3 : null;
+   }
+
+   /**
+* Iterator the skip list and perform given function.
+*
+* @param keyByteBuffer byte buffer storing the key.
+* @param keyOffset offset of the key.
+* @param keyLen length of the key.
+* @param function the function to apply when the skip list contains 
the given key, which accepts two
+* parameters: a tuple3 of [previous_node, 
current_node, next_node] and a boolean indicating
+* whether the node with same key has been logically 
removed, and returns a state.
+* @return a tuple4 of [previous_node, current_node, key_found, 
state_by_applying_function]
+*/
+   private Tuple4 iterateAndProcess(
+   ByteBuffer keyByteBuffer,
+   int keyOffset,
+   int keyLen,
+   BiFunction, Boolean, S> 
function) {
 
 Review comment:
   Maybe leaving the boolean out is better, please check the updated codes and 
let me know how it looks.


This is an automated message from the Apache 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334189729
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, byte[] value, boo
 * @return the old state. Null will be returned if key does not exist 
or returnOldState is false.
 */
private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int 
keyLen, boolean returnOldState) {
+   Tuple4 result = 
iterateAndProcess(keyByteBuffer, keyOffset, keyLen,
+   (tuple3, isRemoved) -> {
+   long prevNode = tuple3.f0;
+   long currentNode = tuple3.f1;
+   long nextNode = tuple3.f2;
+   // if the node has been logically removed, and 
can not be physically
+   // removed here, just return null
+   if (isRemoved && highestRequiredSnapshotVersion 
!= 0) {
+   return null;
+   }
+
+   long oldValuePointer;
+   boolean oldValueNeedFree;
+
+   if (highestRequiredSnapshotVersion == 0) {
+   // do physically remove only when there 
is no snapshot running
+   oldValuePointer = 
doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode);
+   // the node has been logically removed, 
and remove it from the set
+   if (isRemoved) {
+   
logicallyRemovedNodes.remove(currentNode);
+   }
+   oldValueNeedFree = true;
+   } else {
+   int version = 
SkipListUtils.helpGetNodeLatestVersion(currentNode, spaceAllocator);
+   if (version < 
highestRequiredSnapshotVersion) {
+   // the newest-version value may 
be used by snapshots, and update it with copy-on-write
+   oldValuePointer = 
updateValueWithCopyOnWrite(currentNode, null);
+   oldValueNeedFree = false;
+   } else {
+   // replace the newest-version 
value.
+   oldValuePointer = 
updateValueWithReplace(currentNode, null);
+   oldValueNeedFree = true;
+   }
+
+   helpSetNodeStatus(currentNode, 
NodeStatus.REMOVE);
+   logicallyRemovedNodes.add(currentNode);
+   }
+
+   S oldState = null;
+   if (returnOldState) {
+   oldState = 
helpGetState(oldValuePointer);
+   }
+
+   if (oldValueNeedFree) {
+   spaceAllocator.free(oldValuePointer);
+   }
+
+   return oldState;
+   });
+   return result.f2 ? result.f3 : null;
+   }
+
+   /**
+* Iterator the skip list and perform given function.
+*
+* @param keyByteBuffer byte buffer storing the key.
+* @param keyOffset offset of the key.
+* @param keyLen length of the key.
+* @param function the function to apply when the skip list contains 
the given key, which accepts two
+* parameters: a tuple3 of [previous_node, 
current_node, next_node] and a boolean indicating
+* whether the node with same key has been logically 
removed, and returns a state.
+* @return a tuple4 of [previous_node, current_node, key_found, 
state_by_applying_function]
+*/
+   private Tuple4 iterateAndProcess(
 
 Review comment:
   Yes it does, it may break earlier if there's no matching key and key of the 
iterated node is already bigger than given key, or continue to the end if all 
existing node keys are smaller than given key. Just check the `if...else 
if...else` clause at end of the method.


This is an automated message from the Apache Git Service.
To 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334180670
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
+   }
 
-   // validates space allocation. Each pair need 2 spaces
-   assertEquals(totalSize * 2, 
spaceAllocator.getTotalSpaceNumber());
-   verifyState(referenceStates, stateMap);
+   private void testRemoveState(boolean removeAbsent, boolean getOld) {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> {
+   if (removeAbsent) {
+   

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334180018
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -115,577 +144,783 @@ public void testInitStateMap() {
assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext());
 
stateMap.close();
-   assertEquals(0, stateMap.size());
-   assertEquals(0, stateMap.totalSize());
-   assertTrue(stateMap.isClosed());
}
 
/**
-* Test basic operations.
+* Test state put operation.
 */
@Test
-   public void testBasicOperations() throws Exception {
-   TypeSerializer keySerializer = IntSerializer.INSTANCE;
-   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
-   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
-   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
-   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+   public void testPutState() {
+   testWithFunction((totalSize, stateMap, referenceStates) -> 
getDefaultSizes(totalSize));
+   }
 
-   ThreadLocalRandom random = ThreadLocalRandom.current();
-   // map to store expected states, namespace -> key -> state
-   Map> referenceStates = new 
HashMap<>();
-   int totalSize = 0;
+   /**
+* Test remove existing state.
+*/
+   @Test
+   public void testRemoveExistingState() {
+   testRemoveState(false, false);
+   }
 
-   // put some states
-   for (long namespace = 0; namespace < 10; namespace++) {
-   for (int key = 0; key < 100; key++) {
-   totalSize++;
-   String state = String.valueOf(key * namespace);
-   if (random.nextBoolean()) {
-   stateMap.put(key, namespace, state);
-   } else {
-   assertNull(stateMap.putAndGetOld(key, 
namespace, state));
+   /**
+* Test remove and get existing state.
+*/
+   @Test
+   public void testRemoveAndGetExistingState() {
+   testRemoveState(false, true);
+   }
+
+   /**
+* Test remove absent state.
+*/
+   @Test
+   public void testRemoveAbsentState() {
+   testRemoveState(true, true);
+   }
+
+   /**
+* Test remove previously removed state.
+*/
+   @Test
+   public void testPutPreviouslyRemovedState() {
+   testWithFunction(
+   (totalSize, stateMap, referenceStates) -> 
applyFunctionAfterRemove(stateMap, referenceStates,
+   (removedCnt, removedStates) -> {
+   int size = totalSize - removedCnt;
+   for (Map.Entry> 
entry : removedStates.entrySet()) {
+   long namespace = entry.getKey();
+   for (int key : 
entry.getValue()) {
+   size++;
+   String state = 
String.valueOf(key * namespace);
+   
assertNull(stateMap.putAndGetOld(key, namespace, state));
+   
referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, 
String.valueOf(state));
+   }
+   }
+   return getDefaultSizes(size);
}
-   referenceStates.computeIfAbsent(namespace, 
(none) -> new HashMap<>()).put(key, state);
-   assertEquals(totalSize, stateMap.size());
-   assertEquals(totalSize, stateMap.totalSize());
-   }
-   }
+   )
+   );
 
 Review comment:
   Let me try to improve... It seems sometimes we suggest to use functions to 
reduce duplicated codes and the other cases we are against it. Hopefully I 
could get better known about the standard/convention.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334177724
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -1367,6 +1317,14 @@ public Long next() {
}
}
 
+   private Tuple2 getNodeByteBufferAndOffset(long 
node) {
 
 Review comment:
   ok, the change was following [previous review 
comment](https://github.com/apache/flink/pull/9501#discussion_r320370445) and 
let me further improve it. However, I'd like to point out again that such 
operations are on the core path and will be invoked frequently, which makes the 
creation of temporary objects a real burden for GC. Now I cannot tell the 
detailed performance impact or number since it's still half way (or less) 
upstreamed, but I will keep an eye on the cost of such refactor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334154082
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334119571
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334116642
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/SpaceUtils.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_BITS;
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_MARK;
+
+/**
+ * Utils.
+ */
+public class SpaceUtils {
+
+   public static int getChunkIdByAddress(long offset) {
+   return (int) ((offset >>> FOUR_BYTES_BITS) & FOUR_BYTES_MARK);
+   }
+
+   public static int getChunkOffsetByAddress(long offset) {
+   return (int) (offset & FOUR_BYTES_MARK);
+   }
+}
 
 Review comment:
   Unified replies/discussions in other comments, close the conversation here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334111093
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+/**
+ * Tests for {@link ByteBufferUtils}.
+ */
+public class ByteBufferUtilsTest extends TestLogger {
+
+   @Test
+   public void testDirectBBWriteAndRead() {
+   testWithDifferentOffset(true);
+   }
+
+   @Test
+   public void testHeapBBWriteAndRead() {
+   testWithDifferentOffset(false);
+   }
+
+   @Test
+   public void testCompareDirectBBToArray() {
+   testCompareTo(true, false, false);
+   }
+
+   @Test
+   public void testCompareDirectBBToDirectBB() {
+   testCompareTo(true, true, true);
+   }
+
+   @Test
+   public void testCompareDirectBBToHeapBB() {
+   testCompareTo(true, true, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToArray() {
+   testCompareTo(false, false, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToDirectBB() {
+   testCompareTo(false, true, true);
+   }
+
+   @Test
+   public void testCompareHeapBBToHeapBB() {
+   testCompareTo(false, true, false);
+   }
+
+   private void testCompareTo(boolean isLeftBBDirect, boolean 
isRightBuffer, boolean isRightDirect) {
+   testEquals(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   }
+
+   private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, 
boolean isRightDirect) {
+   byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'};
+   byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'};
+   ByteBuffer left = isLeftBBDirect
+   ? 
ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes)
+   : ByteBuffer.wrap(leftBufferBytes);
+   ByteBuffer right = null;
+   if (isRightBuffer) {
+   right = isRightDirect
+   ? 
ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes)
+   : ByteBuffer.wrap(rightBufferBytes);
+   }
+   if (right != null) {
+   Assert.assertThat(
+   ByteBufferUtils.compareTo(left, 1, 4, right, 0, 
4),
 
 Review comment:
   We will still need different starting indices if we want to cover all 
`lessThan`, `equals` and `greaterThan` comparison with two equal arrays, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334106184
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334106138
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334105383
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334087856
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-11 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r334087286
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-05 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r331758223
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-10-05 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r331757816
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r329037403
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r329036607
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r329030111
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java
 ##
 @@ -0,0 +1,1448 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link CopyOnWriteSkipListStateMap}.
+ */
+public class CopyOnWriteSkipListStateMapTest extends TestLogger {
+
+   private TestAllocator spaceAllocator;
+
+   @Before
+   public void setUp() {
+   int maxAllocateSize = 256;
+   spaceAllocator = new TestAllocator(maxAllocateSize);
+   }
+
+   @After
+   public void tearDown() {
+   IOUtils.closeQuietly(spaceAllocator);
+   }
+
+   /**
+* Test initialization of state map.
+*/
+   @Test
+   public void testInitStateMap() {
+   TypeSerializer keySerializer = IntSerializer.INSTANCE;
+   TypeSerializer namespaceSerializer = 
LongSerializer.INSTANCE;
+   TypeSerializer stateSerializer = 
StringSerializer.INSTANCE;
+   CopyOnWriteSkipListStateMap stateMap = 
new CopyOnWriteSkipListStateMap<>(
+   keySerializer, namespaceSerializer, stateSerializer, 
spaceAllocator);
+
+   assertTrue(stateMap.isEmpty());
+   assertEquals(0, stateMap.size());
+   assertEquals(0, stateMap.totalSize());
+   assertEquals(0, stateMap.getRequestCount());
+   assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty());
+   assertEquals(0, stateMap.getHighestRequiredSnapshotVersion());
+   assertEquals(0, stateMap.getHighestFinishedSnapshotVersion());
+   assertTrue(stateMap.getSnapshotVersions().isEmpty());
+   assertTrue(stateMap.getPruningValueNodes().isEmpty());
+   assertEquals(0, stateMap.getResourceGuard().getLeaseCount());
+   assertFalse(stateMap.getResourceGuard().isClosed());
+   assertFalse(stateMap.isClosed());
+
+   assertNull(stateMap.get(0, 0L));
+   

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328482896
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/SpaceUtils.java
 ##
 @@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_BITS;
+import static 
org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_MARK;
+
+/**
+ * Utils.
+ */
+public class SpaceUtils {
+
+   public static int getChunkIdByAddress(long offset) {
+   return (int) ((offset >>> FOUR_BYTES_BITS) & FOUR_BYTES_MARK);
+   }
+
+   public static int getChunkOffsetByAddress(long offset) {
+   return (int) (offset & FOUR_BYTES_MARK);
+   }
+}
 
 Review comment:
   I admit that we have sacrificed the readability for performance in multiple 
places, but for lower level implementations I'm afraid such tradeoff is 
necessary. We could also see examples in JDK implementation such as 
`ConcurrentSkipListMap` (smile).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328480573
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Chunk is a contiguous byteBuffer. or logically contiguous space .
+ * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M 
on-heap ByteBuffer
+ */
+public interface Chunk {
+   /**
+* Try to allocate size bytes from the chunk. spaceSizeInfo will record 
occupied space size.
+*
+* @return the offset of the successful allocation, or -1 to indicate 
not-enough-space
+*/
+   int allocate(int len);
+
+   /**
+* release the space addressed by interChunkOffset. spaceSizeInfo will 
record occupied space size.
+*
+* @param interChunkOffset offset of the chunk
+*/
+   void free(int interChunkOffset);
+
+   /**
+* @return Id of this Chunk
+*/
+   int getChunkId();
+
+   int getChunkCapacity();
+
+   /**
+* @return This chunk's backing ByteBuffer described by chunkOffset.
 
 Review comment:
   It's always backed by one or multiple `ByteBuffer`. For memory-mapped file 
it will be one `MappedByteBuffer` and for memory it will be one or multiple 
`HeapByteBuffer` or `DirectByteBuffer`. Will update the javadoc of the class to 
clarify these.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328474557
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Chunk is a contiguous byteBuffer. or logically contiguous space .
+ * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M 
on-heap ByteBuffer
+ */
+public interface Chunk {
+   /**
+* Try to allocate size bytes from the chunk. spaceSizeInfo will record 
occupied space size.
+*
+* @return the offset of the successful allocation, or -1 to indicate 
not-enough-space
 
 Review comment:
   Ditto.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328474162
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java
 ##
 @@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Chunk is a contiguous byteBuffer. or logically contiguous space .
+ * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M 
on-heap ByteBuffer
+ */
+public interface Chunk {
+   /**
+* Try to allocate size bytes from the chunk. spaceSizeInfo will record 
occupied space size.
 
 Review comment:
   Some legacy javadoc comments, should be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328473311
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java
 ##
 @@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap.space;
+
+import java.io.Closeable;
+
+/**
+ * Implementations are responsible for allocate space.
+ */
+public interface Allocator extends Closeable {
+
+   /**
+* Allocate space with the given size.
+*
+* @param size size of space to allocate.
+* @return address of the allocated space, or -1 when allocation is 
failed.
+*/
+   long allocate(int size);
 
 Review comment:
   We will check against `-1`, see `SkipListUtils.NIL_VALUE_POINTER`.
   
   We don't throw exception here because the definition of methods in 
`StateMap` don't throw exceptions, so we will need to handle the exception 
somewhere if we throw any in lower level functions, then no big difference from 
handling `-1`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328464527
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328460576
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328460043
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328456850
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328452591
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-25 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328435002
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-25 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328091422
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-25 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r328036088
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-25 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r327990096
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-25 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r327989026
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-19 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r326183003
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-19 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r326159393
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-19 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r326159038
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325990356
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325987134
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325986596
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325980641
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325743874
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+/**
+ * Tests for {@link ByteBufferUtils}.
+ */
+public class ByteBufferUtilsTest extends TestLogger {
+
+   @Test
+   public void testDirectBBWriteAndRead() {
+   testWithDifferentOffset(true);
+   }
+
+   @Test
+   public void testHeapBBWriteAndRead() {
+   testWithDifferentOffset(false);
+   }
+
+   @Test
+   public void testCompareDirectBBToArray() {
+   testCompareTo(true, false, false);
+   }
+
+   @Test
+   public void testCompareDirectBBToDirectBB() {
+   testCompareTo(true, true, true);
+   }
+
+   @Test
+   public void testCompareDirectBBToHeapBB() {
+   testCompareTo(true, true, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToArray() {
+   testCompareTo(false, false, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToDirectBB() {
+   testCompareTo(false, true, true);
+   }
+
+   @Test
+   public void testCompareHeapBBToHeapBB() {
+   testCompareTo(false, true, false);
+   }
+
+   private void testCompareTo(boolean isLeftBBDirect, boolean 
isRightBuffer, boolean isRightDirect) {
+   testEquals(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   }
+
+   private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, 
boolean isRightDirect) {
+   byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'};
+   byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'};
+   ByteBuffer left = isLeftBBDirect
+   ? 
ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes)
+   : ByteBuffer.wrap(leftBufferBytes);
+   ByteBuffer right = null;
+   if (isRightBuffer) {
+   right = isRightDirect
+   ? 
ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes)
+   : ByteBuffer.wrap(rightBufferBytes);
+   }
+   if (right != null) {
+   Assert.assertThat(
+   ByteBufferUtils.compareTo(left, 1, 4, right, 0, 
4),
 
 Review comment:
   Just to verify the `compareTo` method could correctly work with none zero 
offsets.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325763513
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java
 ##
 @@ -0,0 +1,195 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+/**
+ * Tests for {@link ByteBufferUtils}.
+ */
+public class ByteBufferUtilsTest extends TestLogger {
+
+   @Test
+   public void testDirectBBWriteAndRead() {
+   testWithDifferentOffset(true);
+   }
+
+   @Test
+   public void testHeapBBWriteAndRead() {
+   testWithDifferentOffset(false);
+   }
+
+   @Test
+   public void testCompareDirectBBToArray() {
+   testCompareTo(true, false, false);
+   }
+
+   @Test
+   public void testCompareDirectBBToDirectBB() {
+   testCompareTo(true, true, true);
+   }
+
+   @Test
+   public void testCompareDirectBBToHeapBB() {
+   testCompareTo(true, true, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToArray() {
+   testCompareTo(false, false, false);
+   }
+
+   @Test
+   public void testCompareHeapBBToDirectBB() {
+   testCompareTo(false, true, true);
+   }
+
+   @Test
+   public void testCompareHeapBBToHeapBB() {
+   testCompareTo(false, true, false);
+   }
+
+   private void testCompareTo(boolean isLeftBBDirect, boolean 
isRightBuffer, boolean isRightDirect) {
+   testEquals(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect);
+   }
+
+   private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, 
boolean isRightDirect) {
+   byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'};
+   byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'};
+   ByteBuffer left = isLeftBBDirect
+   ? 
ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes)
+   : ByteBuffer.wrap(leftBufferBytes);
+   ByteBuffer right = null;
+   if (isRightBuffer) {
+   right = isRightDirect
+   ? 
ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes)
+   : ByteBuffer.wrap(rightBufferBytes);
+   }
+   if (right != null) {
+   Assert.assertThat(
+   ByteBufferUtils.compareTo(left, 1, 4, right, 0, 
4),
+   is(0));
+   } else {
+   Assert.assertThat(
+   ByteBufferUtils.compareTo(left, 1, 4, 
rightBufferBytes, 0, 4),
+   is(0));
+   }
+   }
+
+   private void testLessThan(boolean isLeftBBDirect, boolean 
isRightBuffer, boolean isRightDirect) {
+   byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'};
+   byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'};
+   ByteBuffer left = isLeftBBDirect
+   ? 
ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes)
+   : ByteBuffer.wrap(leftBufferBytes);
+   ByteBuffer right = null;
+   if (isRightBuffer) {
+   right = isRightDirect
+   ? 
ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes)
+   : ByteBuffer.wrap(rightBufferBytes);
+ 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325712863
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
 ##
 @@ -0,0 +1,797 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.core.memory.ByteBufferUtils;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities for skip list.
+ */
+@SuppressWarnings("WeakerAccess")
+public class SkipListUtils {
+   static final long NIL_NODE = -1;
+   static final long HEAD_NODE = -2;
+   static final long NIL_VALUE_POINTER = -1;
+   static final int MAX_LEVEL = 255;
+   static final int DEFAULT_LEVEL = 32;
+   static final int BYTE_MASK = 0xFF;
+
+   /**
+* Key space schema.
+* - key meta
+* -- int: level & status
+* --   byte 0: level of node in skip list
+* --   byte 1: node status
+* --   byte 2: preserve
+* --   byte 3: preserve
+* -- int: length of key
+* -- long: pointer to the newest value
+* -- long: pointer to next node on level 0
+* -- long[]: array of pointers to next node on different levels 
excluding level 0
+* -- long[]: array of pointers to previous node on different levels 
excluding level 0
+* - byte[]: data of key
+*/
+   static final int KEY_META_OFFSET = 0;
+   static final int KEY_LEN_OFFSET = KEY_META_OFFSET + Integer.BYTES;
+   static final int VALUE_POINTER_OFFSET = KEY_LEN_OFFSET + Integer.BYTES;
+   static final int NEXT_KEY_POINTER_OFFSET = VALUE_POINTER_OFFSET + 
Long.BYTES;
+   static final int LEVEL_INDEX_OFFSET = NEXT_KEY_POINTER_OFFSET + 
Long.BYTES;
+
+
+   /**
+* Pre-compute the offset of index for different levels to dismiss the 
duplicated
+* computation at runtime.
+*/
+   private static final int[] INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   /**
+* Pre-compute the length of key meta for different levels to dismiss 
the duplicated
+* computation at runtime.
+*/
+   private static final int[] KEY_META_LEN_BY_LEVEL_ARRAY = new 
int[MAX_LEVEL + 1];
+
+   static {
+   for (int i = 1; i < INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY.length; 
i++) {
+   INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY[i] = 
LEVEL_INDEX_OFFSET + (i - 1) * Long.BYTES;
+   }
+
+   for (int i = 0; i < KEY_META_LEN_BY_LEVEL_ARRAY.length; i++) {
+   KEY_META_LEN_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + 2 
* i * Long.BYTES;
+   }
+   }
+
+   /**
+* Returns the level of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static int getLevel(ByteBuffer byteBuffer, int offset) {
+   return ByteBufferUtils.toInt(byteBuffer, offset + 
KEY_META_OFFSET) & BYTE_MASK;
+   }
+
+   /**
+* Returns the status of the node.
+*
+* @param byteBuffer byte buffer for key space.
+* @param offset offset of key space in the byte buffer.
+*/
+   public static byte getNodeStatus(ByteBuffer byteBuffer, int offset) {
 
 Review comment:
   We thought adding a `valueOf` in `NodeStatus` and do switch of would have a 
worse performance, but after a JMH check there's no big difference between 
"firstly valueOf and then comparing the enum" and "pass the byte and compare 
with getValue", so will change to use the more readable way.


This is an automated message from the Apache Git 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325650441
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapSnapshot.java
 ##
 @@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.util.ResourceGuard;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * This class represents the snapshot of a {@link CopyOnWriteSkipListStateMap}.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMapSnapshot
+   extends StateMapSnapshot> 
{
+
+   /**
+* Version of the {@link CopyOnWriteSkipListStateMap} when this 
snapshot was created. This can be used to release the snapshot.
+*/
+   private final int snapshotVersion;
+
+   /** The number of (non-null) entries in snapshotData. */
+   @Nonnegative
+   private final int numberOfEntriesInSnapshotData;
+
+   /**
+* This lease protects the state map resources.
+*/
+   private final ResourceGuard.Lease lease;
+
+   /**
+* Creates a new {@link CopyOnWriteSkipListStateMap}.
+*
+* @param owningStateMap the {@link CopyOnWriteSkipListStateMap} for 
which this object represents a snapshot.
+* @param lease the lease protects the state map resources.
+*/
+   CopyOnWriteSkipListStateMapSnapshot(
+   CopyOnWriteSkipListStateMap owningStateMap,
+   ResourceGuard.Lease lease) {
+   super(owningStateMap);
+
+   this.snapshotVersion = owningStateMap.getStateMapVersion();
+   this.numberOfEntriesInSnapshotData = owningStateMap.size();
+   this.lease = lease;
+   }
+
+   /**
+* Returns the internal version of the when this snapshot was created.
+*/
+   int getSnapshotVersion() {
+   return snapshotVersion;
+   }
+
+   @Override
+   public void release() {
+   owningStateMap.releaseSnapshot(this);
+   lease.close();
+   }
+
+   @Override
+   public void writeState(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   TypeSerializer stateSerializer,
+   @Nonnull DataOutputView dov,
+   @Nullable StateSnapshotTransformer stateSnapshotTransformer) 
throws IOException {
+   if (stateSnapshotTransformer == null) {
+   writeStateWithNoTransform(dov);
+   } else {
+   writeStateWithTransform(stateSerializer, dov, 
stateSnapshotTransformer);
+   }
+   }
+
+   private void writeStateWithNoTransform(@Nonnull DataOutputView dov) 
throws IOException {
+   dov.writeInt(numberOfEntriesInSnapshotData);
+   SnapshotNodeIterator nodeIterator = new 
SnapshotNodeIterator(true);
+   while (nodeIterator.hasNext()) {
+   Tuple2 tuple = nodeIterator.next();
+   writeKeyAndNamespace(tuple.f0, dov);
+   writeValue(tuple.f1, dov);
+   }
+   }
+
+   private void writeStateWithTransform(
+   TypeSerializer stateSerializer,
+   @Nonnull 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-09-18 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r325646539
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapSnapshot.java
 ##
 @@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.util.ResourceGuard;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * This class represents the snapshot of a {@link CopyOnWriteSkipListStateMap}.
+ *
+ * @param  type of key
+ * @param  type of namespace
+ * @param  type of state
+ */
+public class CopyOnWriteSkipListStateMapSnapshot
+   extends StateMapSnapshot> 
{
+
+   /**
+* Version of the {@link CopyOnWriteSkipListStateMap} when this 
snapshot was created. This can be used to release the snapshot.
+*/
+   private final int snapshotVersion;
+
+   /** The number of (non-null) entries in snapshotData. */
+   @Nonnegative
+   private final int numberOfEntriesInSnapshotData;
+
+   /**
+* This lease protects the state map resources.
+*/
+   private final ResourceGuard.Lease lease;
+
+   /**
+* Creates a new {@link CopyOnWriteSkipListStateMap}.
+*
+* @param owningStateMap the {@link CopyOnWriteSkipListStateMap} for 
which this object represents a snapshot.
+* @param lease the lease protects the state map resources.
+*/
+   CopyOnWriteSkipListStateMapSnapshot(
+   CopyOnWriteSkipListStateMap owningStateMap,
+   ResourceGuard.Lease lease) {
+   super(owningStateMap);
+
+   this.snapshotVersion = owningStateMap.getStateMapVersion();
+   this.numberOfEntriesInSnapshotData = owningStateMap.size();
+   this.lease = lease;
+   }
+
+   /**
+* Returns the internal version of the when this snapshot was created.
+*/
+   int getSnapshotVersion() {
+   return snapshotVersion;
+   }
+
+   @Override
+   public void release() {
+   owningStateMap.releaseSnapshot(this);
+   lease.close();
+   }
+
+   @Override
+   public void writeState(
+   TypeSerializer keySerializer,
+   TypeSerializer namespaceSerializer,
+   TypeSerializer stateSerializer,
+   @Nonnull DataOutputView dov,
+   @Nullable StateSnapshotTransformer stateSnapshotTransformer) 
throws IOException {
+   if (stateSnapshotTransformer == null) {
+   writeStateWithNoTransform(dov);
+   } else {
+   writeStateWithTransform(stateSerializer, dov, 
stateSnapshotTransformer);
+   }
+   }
+
+   private void writeStateWithNoTransform(@Nonnull DataOutputView dov) 
throws IOException {
+   dov.writeInt(numberOfEntriesInSnapshotData);
+   SnapshotNodeIterator nodeIterator = new 
SnapshotNodeIterator(true);
+   while (nodeIterator.hasNext()) {
+   Tuple2 tuple = nodeIterator.next();
+   writeKeyAndNamespace(tuple.f0, dov);
+   writeValue(tuple.f1, dov);
+   }
+   }
+
+   private void writeStateWithTransform(
+   TypeSerializer stateSerializer,
+   @Nonnull 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-30 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r319518588
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.core.memory;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+
+/**
+ * Tests for {@link ByteBufferUtils}.
+ */
+public class ByteBufferUtilsTest {
+
+   @Test
+   public void testBBWriteAndRead() {
+
+   ByteBuffer bb = ByteBuffer.allocateDirect(4096);
+   doTest(bb, 0);
+   doTest(bb, 1);
+   doTest(bb, 2);
+   doTest(bb, 3);
+   doTest(bb, 4);
+   doTest(bb, 5);
+   doTest(bb, 6);
+   doTest(bb, 7);
+
+   bb = ByteBuffer.allocate(4096);
+   doTest(bb, 0);
+   doTest(bb, 1);
+   doTest(bb, 2);
+   doTest(bb, 3);
+   doTest(bb, 4);
+   doTest(bb, 5);
+   doTest(bb, 6);
+   doTest(bb, 7);
+   }
+
+   @Test
+   public void testCompareHeapBufferWithDirectBuffer() {
+   byte[] bytes = new byte[]{'a', 'b', 'c', 'd', 'e'};
+   final int len = bytes.length;
+   ByteBuffer heapBuffer = ByteBuffer.wrap(bytes);
+   ByteBuffer directBuffer = ByteBuffer.allocateDirect(len);
+   directBuffer.put(bytes);
+   int res = ByteBufferUtils.compareTo(heapBuffer, 1, len - 1, 
directBuffer, 1, len - 1);
+   Assert.assertThat(res, is(0));
+   res = ByteBufferUtils.compareTo(heapBuffer, 0, len - 1, 
directBuffer, 1, len - 1);
+   Assert.assertThat(res, lessThan(0));
+   res = ByteBufferUtils.compareTo(heapBuffer, 1, len - 1, 
directBuffer, 0, len - 1);
+   Assert.assertThat(res, greaterThan(0));
+   }
+
+   private void doTest(ByteBuffer bb, int offset) {
+   int positionOri = bb.position();
+
+   ByteBufferUtils.putInt(bb, offset, 123);
+   Assert.assertEquals(bb.position(), positionOri);
+   Assert.assertEquals(123, ByteBufferUtils.toInt(bb, offset));
+   Assert.assertEquals(bb.position(), positionOri);
+
+   ByteBufferUtils.putLong(bb, offset + 4, 1234);
+   Assert.assertEquals(bb.position(), positionOri);
+   Assert.assertEquals(1234, ByteBufferUtils.toLong(bb, offset + 
4));
+   Assert.assertEquals(bb.position(), positionOri);
+
+   Assert.assertEquals(123, ByteBufferUtils.toInt(bb, offset));
+   Assert.assertEquals(bb.position(), positionOri);
+
+   ByteBuffer bb2 = ByteBuffer.allocate(12);
+   int positionOri2 = bb2.position();
+   ByteBufferUtils.copyFromBufferToBuffer(bb, offset, bb2, 0, 12);
+
+   Assert.assertEquals(ByteBufferUtils.toInt(bb2, 0), 123);
+   Assert.assertEquals(ByteBufferUtils.toLong(bb2, 4), 1234);
+   Assert.assertEquals(bb.position(), positionOri);
+   Assert.assertEquals(bb2.position(), positionOri2);
 
 Review comment:
   This case is designed for testing composite operations, will add some 
comments to help understand the logic.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-29 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r319333262
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/UnsafeHelp.java
 ##
 @@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.Unsafe;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * Unsafe use help.
+ */
+public class UnsafeHelp {
 
 Review comment:
   Yes, mainly referred to HBase's 
[UnsafeAccess](https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java),
 will add a comment to mention the source.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-29 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r319071340
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Un-synchronized input stream using the given byte buffer.
+ */
+public class ByteBufferInputStreamWithPos extends InputStream {
 
 Review comment:
   Here is the result of JMH benchmark, where "Orig" means the original 
implementation of `ByteArrayInputStreamWithPos` and "New" means the new one 
rebased on `ByteBufferInputStreamWithPos`. From the result we could see the 
initialization will indeed take more time due to constructing `HeapByteBuffer` 
but there's no regression on `read` functions, so I think the real impact of 
the refactor is ok.
   
   ```
   # Run complete. Total time: 00:06:08
   
   Benchmark  Mode  Cnt   Score 
 Error   Units
   ByteArrayInputStreamBenchmark.testInitNew thrpt   30   51998.302 
± 2460.797  ops/ms
   ByteArrayInputStreamBenchmark.testInitOrigthrpt   30  102891.299 
± 6027.684  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferNew   thrpt   30   22985.900 
± 1437.231  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferOrig  thrpt   30   24436.653 
±  297.965  ops/ms
   ByteArrayInputStreamBenchmark.testReadNew thrpt   30   25100.684 
± 1466.822  ops/ms
   ByteArrayInputStreamBenchmark.testReadOrigthrpt   30   26607.799 
±  413.156  ops/ms
   
   # Run complete. Total time: 00:06:09
   
   Benchmark  Mode  Cnt   Score 
  Error   Units
   ByteArrayInputStreamBenchmark.testInitNew thrpt   30   49778.439 
±  2988.952  ops/ms
   ByteArrayInputStreamBenchmark.testInitOrigthrpt   30  100307.654 
± 16369.951  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferNew   thrpt   30   22838.932 
±  1315.758  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferOrig  thrpt   30   22266.972 
±  1438.928  ops/ms
   ByteArrayInputStreamBenchmark.testReadNew thrpt   30   25849.028 
±  1521.414  ops/ms
   ByteArrayInputStreamBenchmark.testReadOrigthrpt   30   25672.482 
±  1196.857  ops/ms
   
   # Run complete. Total time: 00:06:07
   
   Benchmark  Mode  Cnt   Score 
 Error   Units
   ByteArrayInputStreamBenchmark.testInitNew thrpt   30   53944.255 
± 1009.925  ops/ms
   ByteArrayInputStreamBenchmark.testInitOrigthrpt   30  117808.909 
± 2655.215  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferNew   thrpt   30   24388.177 
±  346.684  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferOrig  thrpt   30   24491.455 
±  323.040  ops/ms
   ByteArrayInputStreamBenchmark.testReadNew thrpt   30   27081.540 
±  471.671  ops/ms
   ByteArrayInputStreamBenchmark.testReadOrigthrpt   30   26694.972 
±  348.197  ops/ms
   ```
   
   Attached is the patch of the benchmark, based on the current flink-benchmark 
master branch. Command used to run the benchmark is `mvn 
-Dflink.version=1.10-SNAPSHOT clean package exec:exec -Dexec.executable=java 
-Dexec.args="-jar target/benchmarks.jar -rf csv 
org.apache.flink.benchmark.memory.*"
   
   
[bytearrayinputstream.benchmark.patch](https://github.com/apache/flink/files/3555750/bytearrayinputstream.benchmark.patch.txt)
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-29 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r319071340
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Un-synchronized input stream using the given byte buffer.
+ */
+public class ByteBufferInputStreamWithPos extends InputStream {
 
 Review comment:
   Here is the result of JMH benchmark, where "Orig" means the original 
implementation of `ByteArrayInputStreamWithPos` and "New" means the new one 
rebased on `ByteBufferInputStreamWithPos`. From the result we could see the 
initialization will indeed take more time due to constructing `HeapByteBuffer` 
but there's no regression on `read` functions, so I think the real impact of 
the refactor is ok.
   
   ```
   # Run complete. Total time: 00:06:08
   
   Benchmark  Mode  Cnt   Score 
 Error   Units
   ByteArrayInputStreamBenchmark.testInitNew thrpt   30   51998.302 
± 2460.797  ops/ms
   ByteArrayInputStreamBenchmark.testInitOrigthrpt   30  102891.299 
± 6027.684  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferNew   thrpt   30   22985.900 
± 1437.231  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferOrig  thrpt   30   24436.653 
±  297.965  ops/ms
   ByteArrayInputStreamBenchmark.testReadNew thrpt   30   25100.684 
± 1466.822  ops/ms
   ByteArrayInputStreamBenchmark.testReadOrigthrpt   30   26607.799 
±  413.156  ops/ms
   
   # Run complete. Total time: 00:06:09
   
   Benchmark  Mode  Cnt   Score 
  Error   Units
   ByteArrayInputStreamBenchmark.testInitNew thrpt   30   49778.439 
±  2988.952  ops/ms
   ByteArrayInputStreamBenchmark.testInitOrigthrpt   30  100307.654 
± 16369.951  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferNew   thrpt   30   22838.932 
±  1315.758  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferOrig  thrpt   30   22266.972 
±  1438.928  ops/ms
   ByteArrayInputStreamBenchmark.testReadNew thrpt   30   25849.028 
±  1521.414  ops/ms
   ByteArrayInputStreamBenchmark.testReadOrigthrpt   30   25672.482 
±  1196.857  ops/ms
   
   # Run complete. Total time: 00:06:07
   
   Benchmark  Mode  Cnt   Score 
 Error   Units
   ByteArrayInputStreamBenchmark.testInitNew thrpt   30   53944.255 
± 1009.925  ops/ms
   ByteArrayInputStreamBenchmark.testInitOrigthrpt   30  117808.909 
± 2655.215  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferNew   thrpt   30   24388.177 
±  346.684  ops/ms
   ByteArrayInputStreamBenchmark.testReadIntoBufferOrig  thrpt   30   24491.455 
±  323.040  ops/ms
   ByteArrayInputStreamBenchmark.testReadNew thrpt   30   27081.540 
±  471.671  ops/ms
   ByteArrayInputStreamBenchmark.testReadOrigthrpt   30   26694.972 
±  348.197  ops/ms
   ```
   
   Attached is the patch of the benchmark, based on the current flink-benchmark 
master branch. Command used to run the benchmark is `mvn 
-Dflink.version=1.10-SNAPSHOT clean package exec:exec -Dexec.executable=java 
-Dexec.args="-jar target/benchmarks.jar -rf csv 
org.apache.flink.benchmark.memory.*"


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-28 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r318412154
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still used by 
unreleased snapshots.
+*/
+   private final TreeSet snapshotVersions;
+
+   /**
+* The size of skip list 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-28 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r318412223
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still used by 
unreleased snapshots.
+*/
+   private final TreeSet snapshotVersions;
+
+   /**
+* The size of skip list 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r318379606
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still used by 
unreleased snapshots.
+*/
+   private final TreeSet snapshotVersions;
+
+   /**
+* The size of skip list 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r318379352
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still used by 
unreleased snapshots.
+*/
+   private final TreeSet snapshotVersions;
+
+   /**
+* The size of skip list 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r318375048
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still used by 
unreleased snapshots.
+*/
+   private final TreeSet snapshotVersions;
+
+   /**
+* The size of skip list 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r318374178
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Un-synchronized input stream using the given byte buffer.
+ */
+public class ByteBufferInputStreamWithPos extends InputStream {
 
 Review comment:
   Will move `ByteBufferInputStreamWithPos` into flink-core and rebase 
`ByteArrayInputStreamWithPos` based on it. I think the performance impact of 
`ByteBuffer.wrap` should be ok since the main cost is to construct a 
`HeapByteBuffer` instance, but will use JMH to identify the details.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-27 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r317920996
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferUtils.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities to get/put data to {@link ByteBuffer}. All methods don't change
+ * byte buffer's position.
+ */
+@SuppressWarnings({"WeakerAccess", "unused", "UnusedReturnValue"})
+public class ByteBufferUtils {
+
+   private static final boolean UNSAFE_AVAIL = UnsafeHelp.isAvailable();
+   private static final boolean UNSAFE_UNALIGNED = UnsafeHelp.unaligned();
+   private static final Field ACCESS_FIELD;
+
+   static {
+   try {
+   ACCESS_FIELD = 
java.nio.Buffer.class.getDeclaredField("address");
+   ACCESS_FIELD.setAccessible(true);
+   } catch (NoSuchFieldException e) {
+   throw new RuntimeException("Failed to get address 
method from java.nio.Buffer", e);
+   }
+   }
+
+   /**
+* Reads an int value at the given buffer's offset.
+*
+* @param buffer the given buffer
+* @param offset the given buffer's offset
+* @return int value at offset
+*/
+   public static int toInt(ByteBuffer buffer, int offset) {
+   if (UNSAFE_UNALIGNED) {
+   return UnsafeHelp.toInt(buffer, offset);
+   } else {
+   return buffer.getInt(offset);
+   }
+   }
+
+   /**
+* Reads a long value at the given buffer's offset.
+*
+* @param buffer the given buffer
+* @param offset the given buffer's offset
+* @return long value at offset
+*/
+   public static long toLong(ByteBuffer buffer, int offset) {
+   if (UNSAFE_UNALIGNED) {
+   return UnsafeHelp.toLong(buffer, offset);
+   } else {
+   return buffer.getLong(offset);
+   }
+   }
+
+   /**
+* Reads a short value at the given buffer's offset.
+*
+* @param buffer the given buffer
+* @param offset the given buffer's offset
+* @return short value at offset
+*/
+   public static short toShort(ByteBuffer buffer, int offset) {
+   if (UNSAFE_UNALIGNED) {
+   return UnsafeHelp.toShort(buffer, offset);
+   } else {
+   return buffer.getShort(offset);
+   }
+   }
+
+   public static byte toByte(ByteBuffer buffer, int offset) {
+   if (UnsafeHelp.isAvailable()) {
+   return UnsafeHelp.toByte(buffer, offset);
+   } else {
+   return buffer.get(offset);
+   }
+   }
+
+   public static void putInt(ByteBuffer buffer, int index, int val) {
+   if (UNSAFE_UNALIGNED) {
+   UnsafeHelp.putInt(buffer, index, val);
+   } else {
+   buffer.putInt(index, val);
+   }
+   }
+
+   public static void putLong(ByteBuffer buffer, int index, long val) {
+   if (UNSAFE_UNALIGNED) {
+   UnsafeHelp.putLong(buffer, index, val);
+   } else {
+   buffer.putLong(index, val);
+   }
+   }
+
+   /**
+* Copy from one buffer to another from given offset. This will be 
absolute positional copying and
+* won't affect the position of any of the buffers.
+*
+* @param inthe given buffer to read
+* @param out   the given buffer of destination
+* @param sourceOffset  the given buffer's offset of src
+* @param destinationOffset the 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r317895384
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferUtils.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities to get/put data to {@link ByteBuffer}. All methods don't change
+ * byte buffer's position.
+ */
+@SuppressWarnings({"WeakerAccess", "unused", "UnusedReturnValue"})
 
 Review comment:
   Will move this class to `org.apache.flink.core.memory` in flink-core, then 
we don't need the `WeakerAccess` warning suppress anymore. The current "unused" 
method might be used latter since this is only part of the whole work, will add 
test case to cover it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r317895515
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferUtils.java
 ##
 @@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import javax.annotation.Nonnull;
+
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities to get/put data to {@link ByteBuffer}. All methods don't change
+ * byte buffer's position.
+ */
+@SuppressWarnings({"WeakerAccess", "unused", "UnusedReturnValue"})
+public class ByteBufferUtils {
+
+   private static final boolean UNSAFE_AVAIL = UnsafeHelp.isAvailable();
+   private static final boolean UNSAFE_UNALIGNED = UnsafeHelp.unaligned();
+   private static final Field ACCESS_FIELD;
+
+   static {
+   try {
+   ACCESS_FIELD = 
java.nio.Buffer.class.getDeclaredField("address");
+   ACCESS_FIELD.setAccessible(true);
+   } catch (NoSuchFieldException e) {
+   throw new RuntimeException("Failed to get address 
method from java.nio.Buffer", e);
+   }
+   }
+
+   /**
+* Reads an int value at the given buffer's offset.
+*
+* @param buffer the given buffer
+* @param offset the given buffer's offset
+* @return int value at offset
+*/
+   public static int toInt(ByteBuffer buffer, int offset) {
+   if (UNSAFE_UNALIGNED) {
+   return UnsafeHelp.toInt(buffer, offset);
+   } else {
+   return buffer.getInt(offset);
+   }
+   }
+
+   /**
+* Reads a long value at the given buffer's offset.
+*
+* @param buffer the given buffer
+* @param offset the given buffer's offset
+* @return long value at offset
+*/
+   public static long toLong(ByteBuffer buffer, int offset) {
+   if (UNSAFE_UNALIGNED) {
+   return UnsafeHelp.toLong(buffer, offset);
+   } else {
+   return buffer.getLong(offset);
+   }
+   }
+
+   /**
+* Reads a short value at the given buffer's offset.
+*
+* @param buffer the given buffer
+* @param offset the given buffer's offset
+* @return short value at offset
+*/
+   public static short toShort(ByteBuffer buffer, int offset) {
+   if (UNSAFE_UNALIGNED) {
+   return UnsafeHelp.toShort(buffer, offset);
+   } else {
+   return buffer.getShort(offset);
+   }
+   }
+
+   public static byte toByte(ByteBuffer buffer, int offset) {
+   if (UnsafeHelp.isAvailable()) {
+   return UnsafeHelp.toByte(buffer, offset);
+   } else {
+   return buffer.get(offset);
+   }
+   }
+
+   public static void putInt(ByteBuffer buffer, int index, int val) {
+   if (UNSAFE_UNALIGNED) {
+   UnsafeHelp.putInt(buffer, index, val);
+   } else {
+   buffer.putInt(index, val);
+   }
+   }
+
+   public static void putLong(ByteBuffer buffer, int index, long val) {
+   if (UNSAFE_UNALIGNED) {
+   UnsafeHelp.putLong(buffer, index, val);
+   } else {
+   buffer.putLong(index, val);
+   }
+   }
+
+   /**
+* Copy from one buffer to another from given offset. This will be 
absolute positional copying and
+* won't affect the position of any of the buffers.
+*
+* @param inthe given buffer to read
+* @param out   the given buffer of destination
+* @param sourceOffset  the given buffer's offset of src
+* @param destinationOffset the 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-26 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r317887398
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
 
 Review comment:
   Agree that we'd better use another package name. How about we rename the 
package as the last step after all review comments addressed just to convenient 
the review?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-23 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r317013585
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still used by 
unreleased snapshots.
+*/
+   private final TreeSet snapshotVersions;
+
+   /**
+* The size of skip list 

[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend

2019-08-23 Thread GitBox
carp84 commented on a change in pull request #9501: [FLINK-12697] [State 
Backends] Support on-disk state storage for spill-able heap backend
URL: https://github.com/apache/flink/pull/9501#discussion_r317013585
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
 ##
 @@ -0,0 +1,1505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StateEntry;
+import org.apache.flink.runtime.state.StateTransformationFunction;
+import org.apache.flink.runtime.state.heap.space.Allocator;
+import org.apache.flink.runtime.state.heap.space.Chunk;
+import org.apache.flink.runtime.state.heap.space.SpaceUtils;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ResourceGuard;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterators;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE;
+import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE;
+import static 
org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER;
+
+/**
+ * Implementation of state map which is based on skip list with copy-on-write 
support. states will
+ * be serialized to bytes and stored in the space allocated with the given 
allocator.
+ */
+public class CopyOnWriteSkipListStateMap extends StateMap 
implements AutoCloseable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class);
+
+   /**
+* Default max number of logically-removed keys to delete one time.
+*/
+   private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3;
+
+   /**
+* Default ratio of the logically-removed keys to trigger deletion when 
snapshot.
+*/
+   private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f;
+
+   /**
+* The serializer used to serialize the key and namespace to bytes 
stored in skip list.
+*/
+   private final SkipListKeySerializer skipListKeySerializer;
+
+   /**
+* The serializer used to serialize the state to bytes stored in skip 
list.
+*/
+   private final SkipListValueSerializer skipListValueSerializer;
+
+   /**
+* Space allocator.
+*/
+   private final Allocator spaceAllocator;
+
+   /**
+* The level index header.
+*/
+   private final LevelIndexHeader levelIndexHeader;
+
+   /**
+* Seed to generate random index level.
+*/
+   private int randomSeed;
+
+   /**
+* The current version of this map. Used for copy-on-write mechanics.
+*/
+   private int stateMapVersion;
+
+   /**
+* The highest version of this map that is still required by any 
unreleased snapshot.
+*/
+   private int highestRequiredSnapshotVersion;
+
+   /**
+* Snapshots no more than this version must have been finished, but 
there may be some
+* snapshots more than this version are still running.
+*/
+   private volatile int highestFinishedSnapshotVersion;
+
+   /**
+* Maintains an ordered set of version ids that are still used by 
unreleased snapshots.
+*/
+   private final TreeSet snapshotVersions;
+
+   /**
+* The size of skip list