[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334581046 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java ## @@ -0,0 +1,94 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link SkipListKeySerializer}. + */ +public class SkipListSerializerTest extends TestLogger { + + @Test + public void testSkipListKeySerializer() throws IOException { + TypeSerializer keySerializer = StringSerializer.INSTANCE; + TypeSerializer namespaceSerializer = StringSerializer.INSTANCE; + + SkipListKeySerializer skipListKeySerializer = + new SkipListKeySerializer<>(keySerializer, namespaceSerializer); + + for (int i = 0; i < 10; i++) { + String key = "key-abcdedg" + i; + String namespace = "namespace-dfsfdafd" + i; + + byte[] skipListKey = skipListKeySerializer.serialize(key, namespace); + int offset = 10; + byte[] data = new byte[10 + skipListKey.length]; + System.arraycopy(skipListKey, 0, data, offset, skipListKey.length); + ByteBuffer skipListKeyByteBuffer = ByteBuffer.wrap(data); + assertEquals(key, skipListKeySerializer.deserializeKey(skipListKeyByteBuffer, offset, skipListKey.length)); + assertEquals(namespace, skipListKeySerializer.deserializeNamespace(skipListKeyByteBuffer, offset, skipListKey.length)); + + Tuple2 serializedKeyAndNamespace = + skipListKeySerializer.getSerializedKeyAndNamespace(skipListKeyByteBuffer, offset); + assertEquals(key, deserialize(keySerializer, serializedKeyAndNamespace.f0)); + assertEquals(namespace, deserialize(namespaceSerializer, serializedKeyAndNamespace.f1)); + + byte[] serializedNamespace = skipListKeySerializer.serializeNamespace(namespace); + assertEquals(namespace, deserialize(namespaceSerializer, serializedNamespace)); + } + } + + @Test + public void testSkipListValueSerializer() throws IOException { + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + SkipListValueSerializer skipListValueSerializer = + new SkipListValueSerializer<>(stateSerializer); + + for (int i = 0; i < 10; i++) { Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334581016 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java ## @@ -0,0 +1,94 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for {@link SkipListKeySerializer}. + */ +public class SkipListSerializerTest extends TestLogger { + + @Test + public void testSkipListKeySerializer() throws IOException { + TypeSerializer keySerializer = StringSerializer.INSTANCE; + TypeSerializer namespaceSerializer = StringSerializer.INSTANCE; + + SkipListKeySerializer skipListKeySerializer = + new SkipListKeySerializer<>(keySerializer, namespaceSerializer); + + for (int i = 0; i < 10; i++) { Review comment: Yes, will refactor the test to better reflect this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334180670 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); + } - // validates space allocation. Each pair need 2 spaces - assertEquals(totalSize * 2, spaceAllocator.getTotalSpaceNumber()); - verifyState(referenceStates, stateMap); + private void testRemoveState(boolean removeAbsent, boolean getOld) { + testWithFunction( + (totalSize, stateMap, referenceStates) -> { + if (removeAbsent) { +
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334274093 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); Review comment: I don't quite agree here. I think for the boolean parameters, it's normal to get their meaning through javadoc or parameter name in method signature. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334180018 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); Review comment: Let me try to improve... It seems sometimes we suggest to use functions to reduce duplicated codes and the other cases we are against it. Hopefully I could get better known about the standard/convention. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334227853 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import java.io.Closeable; + +/** + * Implementations are responsible for allocate space. + */ +public interface Allocator extends Closeable { + + /** +* Allocate space with the given size. +* +* @param size size of space to allocate. +* @return address of the allocated space, or -1 when allocation is failed. +*/ + long allocate(int size); Review comment: ok, got the question, and yes we should better handle this part, will resolve this in new commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334193967 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -590,10 +542,9 @@ private int compareByteBufferAndNode(ByteBuffer keyByteBuffer, int keyOffset, in * equal to, or greater than the second. */ private int compareNamespaceAndNode(ByteBuffer namespaceByteBuffer, int namespaceOffset, int namespaceLen, long targetNode) { - Chunk chunk = spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(targetNode)); - int offsetInChunk = SpaceUtils.getChunkOffsetByAddress(targetNode); - ByteBuffer targetKeyByteBuffer = chunk.getByteBuffer(offsetInChunk); - int offsetInByteBuffer = chunk.getOffsetInByteBuffer(offsetInChunk); + Tuple2 tuple2 = getNodeByteBufferAndOffset(targetNode); + ByteBuffer targetKeyByteBuffer = tuple2.f0; + int offsetInByteBuffer = tuple2.f1; Review comment: I think the class is just for improving the readability and I really don't want to pass the object everywhere and keep it alive for so long. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334193401 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -1136,10 +1088,9 @@ S helpGetState(long valuePointer, SkipListValueSerializer serializer) { return null; } - Chunk chunk = spaceAllocator.getChunkById(SpaceUtils.getChunkIdByAddress(valuePointer)); - int offsetInChunk = SpaceUtils.getChunkOffsetByAddress(valuePointer); - ByteBuffer bb = chunk.getByteBuffer(offsetInChunk); - int offsetInByteBuffer = chunk.getOffsetInByteBuffer(offsetInChunk); + Tuple2 tuple2 = getNodeByteBufferAndOffset(valuePointer); + ByteBuffer bb = tuple2.f0; + int offsetInByteBuffer = tuple2.f1; Review comment: I think the class is just for improving the readability and I really don't want to pass the object everywhere and keep it alive for so long. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334190093 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, byte[] value, boo * @return the old state. Null will be returned if key does not exist or returnOldState is false. */ private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, boolean returnOldState) { + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, + (tuple3, isRemoved) -> { + long prevNode = tuple3.f0; + long currentNode = tuple3.f1; + long nextNode = tuple3.f2; + // if the node has been logically removed, and can not be physically + // removed here, just return null + if (isRemoved && highestRequiredSnapshotVersion != 0) { + return null; + } + + long oldValuePointer; + boolean oldValueNeedFree; + + if (highestRequiredSnapshotVersion == 0) { + // do physically remove only when there is no snapshot running + oldValuePointer = doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode); + // the node has been logically removed, and remove it from the set + if (isRemoved) { + logicallyRemovedNodes.remove(currentNode); + } + oldValueNeedFree = true; + } else { + int version = SkipListUtils.helpGetNodeLatestVersion(currentNode, spaceAllocator); + if (version < highestRequiredSnapshotVersion) { + // the newest-version value may be used by snapshots, and update it with copy-on-write + oldValuePointer = updateValueWithCopyOnWrite(currentNode, null); + oldValueNeedFree = false; + } else { + // replace the newest-version value. + oldValuePointer = updateValueWithReplace(currentNode, null); + oldValueNeedFree = true; + } + + helpSetNodeStatus(currentNode, NodeStatus.REMOVE); + logicallyRemovedNodes.add(currentNode); + } + + S oldState = null; + if (returnOldState) { + oldState = helpGetState(oldValuePointer); + } + + if (oldValueNeedFree) { + spaceAllocator.free(oldValuePointer); + } + + return oldState; + }); + return result.f2 ? result.f3 : null; + } + + /** +* Iterator the skip list and perform given function. +* +* @param keyByteBuffer byte buffer storing the key. +* @param keyOffset offset of the key. +* @param keyLen length of the key. +* @param function the function to apply when the skip list contains the given key, which accepts two +* parameters: a tuple3 of [previous_node, current_node, next_node] and a boolean indicating +* whether the node with same key has been logically removed, and returns a state. +* @return a tuple4 of [previous_node, current_node, key_found, state_by_applying_function] +*/ + private Tuple4 iterateAndProcess( + ByteBuffer keyByteBuffer, + int keyOffset, + int keyLen, + BiFunction, Boolean, S> function) { Review comment: Maybe leaving the boolean out is better, please check the updated codes and let me know how it looks. This is an automated message from the Apache Git
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334189729 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -472,6 +397,74 @@ S putNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, byte[] value, boo * @return the old state. Null will be returned if key does not exist or returnOldState is false. */ private S removeNode(ByteBuffer keyByteBuffer, int keyOffset, int keyLen, boolean returnOldState) { + Tuple4 result = iterateAndProcess(keyByteBuffer, keyOffset, keyLen, + (tuple3, isRemoved) -> { + long prevNode = tuple3.f0; + long currentNode = tuple3.f1; + long nextNode = tuple3.f2; + // if the node has been logically removed, and can not be physically + // removed here, just return null + if (isRemoved && highestRequiredSnapshotVersion != 0) { + return null; + } + + long oldValuePointer; + boolean oldValueNeedFree; + + if (highestRequiredSnapshotVersion == 0) { + // do physically remove only when there is no snapshot running + oldValuePointer = doPhysicalRemoveAndGetValue(currentNode, prevNode, nextNode); + // the node has been logically removed, and remove it from the set + if (isRemoved) { + logicallyRemovedNodes.remove(currentNode); + } + oldValueNeedFree = true; + } else { + int version = SkipListUtils.helpGetNodeLatestVersion(currentNode, spaceAllocator); + if (version < highestRequiredSnapshotVersion) { + // the newest-version value may be used by snapshots, and update it with copy-on-write + oldValuePointer = updateValueWithCopyOnWrite(currentNode, null); + oldValueNeedFree = false; + } else { + // replace the newest-version value. + oldValuePointer = updateValueWithReplace(currentNode, null); + oldValueNeedFree = true; + } + + helpSetNodeStatus(currentNode, NodeStatus.REMOVE); + logicallyRemovedNodes.add(currentNode); + } + + S oldState = null; + if (returnOldState) { + oldState = helpGetState(oldValuePointer); + } + + if (oldValueNeedFree) { + spaceAllocator.free(oldValuePointer); + } + + return oldState; + }); + return result.f2 ? result.f3 : null; + } + + /** +* Iterator the skip list and perform given function. +* +* @param keyByteBuffer byte buffer storing the key. +* @param keyOffset offset of the key. +* @param keyLen length of the key. +* @param function the function to apply when the skip list contains the given key, which accepts two +* parameters: a tuple3 of [previous_node, current_node, next_node] and a boolean indicating +* whether the node with same key has been logically removed, and returns a state. +* @return a tuple4 of [previous_node, current_node, key_found, state_by_applying_function] +*/ + private Tuple4 iterateAndProcess( Review comment: Yes it does, it may break earlier if there's no matching key and key of the iterated node is already bigger than given key, or continue to the end if all existing node keys are smaller than given key. Just check the `if...else if...else` clause at end of the method. This is an automated message from the Apache Git Service. To respon
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334180670 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); + } - // validates space allocation. Each pair need 2 spaces - assertEquals(totalSize * 2, spaceAllocator.getTotalSpaceNumber()); - verifyState(referenceStates, stateMap); + private void testRemoveState(boolean removeAbsent, boolean getOld) { + testWithFunction( + (totalSize, stateMap, referenceStates) -> { + if (removeAbsent) { +
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334180018 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -115,577 +144,783 @@ public void testInitStateMap() { assertFalse(stateMap.getStateIncrementalVisitor(100).hasNext()); stateMap.close(); - assertEquals(0, stateMap.size()); - assertEquals(0, stateMap.totalSize()); - assertTrue(stateMap.isClosed()); } /** -* Test basic operations. +* Test state put operation. */ @Test - public void testBasicOperations() throws Exception { - TypeSerializer keySerializer = IntSerializer.INSTANCE; - TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; - TypeSerializer stateSerializer = StringSerializer.INSTANCE; - CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( - keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + public void testPutState() { + testWithFunction((totalSize, stateMap, referenceStates) -> getDefaultSizes(totalSize)); + } - ThreadLocalRandom random = ThreadLocalRandom.current(); - // map to store expected states, namespace -> key -> state - Map> referenceStates = new HashMap<>(); - int totalSize = 0; + /** +* Test remove existing state. +*/ + @Test + public void testRemoveExistingState() { + testRemoveState(false, false); + } - // put some states - for (long namespace = 0; namespace < 10; namespace++) { - for (int key = 0; key < 100; key++) { - totalSize++; - String state = String.valueOf(key * namespace); - if (random.nextBoolean()) { - stateMap.put(key, namespace, state); - } else { - assertNull(stateMap.putAndGetOld(key, namespace, state)); + /** +* Test remove and get existing state. +*/ + @Test + public void testRemoveAndGetExistingState() { + testRemoveState(false, true); + } + + /** +* Test remove absent state. +*/ + @Test + public void testRemoveAbsentState() { + testRemoveState(true, true); + } + + /** +* Test remove previously removed state. +*/ + @Test + public void testPutPreviouslyRemovedState() { + testWithFunction( + (totalSize, stateMap, referenceStates) -> applyFunctionAfterRemove(stateMap, referenceStates, + (removedCnt, removedStates) -> { + int size = totalSize - removedCnt; + for (Map.Entry> entry : removedStates.entrySet()) { + long namespace = entry.getKey(); + for (int key : entry.getValue()) { + size++; + String state = String.valueOf(key * namespace); + assertNull(stateMap.putAndGetOld(key, namespace, state)); + referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, String.valueOf(state)); + } + } + return getDefaultSizes(size); } - referenceStates.computeIfAbsent(namespace, (none) -> new HashMap<>()).put(key, state); - assertEquals(totalSize, stateMap.size()); - assertEquals(totalSize, stateMap.totalSize()); - } - } + ) + ); Review comment: Let me try to improve... It seems sometimes we suggest to use functions to reduce duplicated codes and the other cases we are against it. Hopefully I could get better known about the standard/convention. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334177724 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -1367,6 +1317,14 @@ public Long next() { } } + private Tuple2 getNodeByteBufferAndOffset(long node) { Review comment: ok, the change was following [previous review comment](https://github.com/apache/flink/pull/9501#discussion_r320370445) and let me further improve it. However, I'd like to point out again that such operations are on the core path and will be invoked frequently, which makes the creation of temporary objects a real burden for GC. Now I cannot tell the detailed performance impact or number since it's still half way (or less) upstreamed, but I will keep an eye on the cost of such refactor. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334154082 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); + assertFalse(stateMap.contai
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334119571 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); + assertFalse(stateMap.contai
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334116642 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/SpaceUtils.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import static org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_BITS; +import static org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_MARK; + +/** + * Utils. + */ +public class SpaceUtils { + + public static int getChunkIdByAddress(long offset) { + return (int) ((offset >>> FOUR_BYTES_BITS) & FOUR_BYTES_MARK); + } + + public static int getChunkOffsetByAddress(long offset) { + return (int) (offset & FOUR_BYTES_MARK); + } +} Review comment: Unified replies/discussions in other comments, close the conversation here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334111093 ## File path: flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java ## @@ -0,0 +1,195 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.core.memory; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +/** + * Tests for {@link ByteBufferUtils}. + */ +public class ByteBufferUtilsTest extends TestLogger { + + @Test + public void testDirectBBWriteAndRead() { + testWithDifferentOffset(true); + } + + @Test + public void testHeapBBWriteAndRead() { + testWithDifferentOffset(false); + } + + @Test + public void testCompareDirectBBToArray() { + testCompareTo(true, false, false); + } + + @Test + public void testCompareDirectBBToDirectBB() { + testCompareTo(true, true, true); + } + + @Test + public void testCompareDirectBBToHeapBB() { + testCompareTo(true, true, false); + } + + @Test + public void testCompareHeapBBToArray() { + testCompareTo(false, false, false); + } + + @Test + public void testCompareHeapBBToDirectBB() { + testCompareTo(false, true, true); + } + + @Test + public void testCompareHeapBBToHeapBB() { + testCompareTo(false, true, false); + } + + private void testCompareTo(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + testEquals(isLeftBBDirect, isRightBuffer, isRightDirect); + testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect); + testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect); + } + + private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'}; + byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'}; + ByteBuffer left = isLeftBBDirect + ? ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes) + : ByteBuffer.wrap(leftBufferBytes); + ByteBuffer right = null; + if (isRightBuffer) { + right = isRightDirect + ? ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes) + : ByteBuffer.wrap(rightBufferBytes); + } + if (right != null) { + Assert.assertThat( + ByteBufferUtils.compareTo(left, 1, 4, right, 0, 4), Review comment: We will still need different starting indices if we want to cover all `lessThan`, `equals` and `greaterThan` comparison with two equal arrays, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334106184 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334106138 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334105383 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334087856 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r334087286 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r331758223 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r331757816 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r331757081 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import java.io.Closeable; + +/** + * Implementations are responsible for allocate space. + */ +public interface Allocator extends Closeable { + + /** +* Allocate space with the given size. +* +* @param size size of space to allocate. +* @return address of the allocated space, or -1 when allocation is failed. +*/ + long allocate(int size); Review comment: I mean we're checking the return value, not `-1` literally but using a constant `SkipListUtils.NIL_VALUE_POINTER` to represent it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r329037403 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); + assertFalse(stateMap.contai
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r329036607 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); + assertFalse(stateMap.contai
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r329030111 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapTest.java ## @@ -0,0 +1,1448 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; +import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests for {@link CopyOnWriteSkipListStateMap}. + */ +public class CopyOnWriteSkipListStateMapTest extends TestLogger { + + private TestAllocator spaceAllocator; + + @Before + public void setUp() { + int maxAllocateSize = 256; + spaceAllocator = new TestAllocator(maxAllocateSize); + } + + @After + public void tearDown() { + IOUtils.closeQuietly(spaceAllocator); + } + + /** +* Test initialization of state map. +*/ + @Test + public void testInitStateMap() { + TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer namespaceSerializer = LongSerializer.INSTANCE; + TypeSerializer stateSerializer = StringSerializer.INSTANCE; + CopyOnWriteSkipListStateMap stateMap = new CopyOnWriteSkipListStateMap<>( + keySerializer, namespaceSerializer, stateSerializer, spaceAllocator); + + assertTrue(stateMap.isEmpty()); + assertEquals(0, stateMap.size()); + assertEquals(0, stateMap.totalSize()); + assertEquals(0, stateMap.getRequestCount()); + assertTrue(stateMap.getLogicallyRemovedNodes().isEmpty()); + assertEquals(0, stateMap.getHighestRequiredSnapshotVersion()); + assertEquals(0, stateMap.getHighestFinishedSnapshotVersion()); + assertTrue(stateMap.getSnapshotVersions().isEmpty()); + assertTrue(stateMap.getPruningValueNodes().isEmpty()); + assertEquals(0, stateMap.getResourceGuard().getLeaseCount()); + assertFalse(stateMap.getResourceGuard().isClosed()); + assertFalse(stateMap.isClosed()); + + assertNull(stateMap.get(0, 0L)); + assertFalse(stateMap.contai
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328482896 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/SpaceUtils.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import static org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_BITS; +import static org.apache.flink.runtime.state.heap.space.Constants.FOUR_BYTES_MARK; + +/** + * Utils. + */ +public class SpaceUtils { + + public static int getChunkIdByAddress(long offset) { + return (int) ((offset >>> FOUR_BYTES_BITS) & FOUR_BYTES_MARK); + } + + public static int getChunkOffsetByAddress(long offset) { + return (int) (offset & FOUR_BYTES_MARK); + } +} Review comment: I admit that we have sacrificed the readability for performance in multiple places, but for lower level implementations I'm afraid such tradeoff is necessary. We could also see examples in JDK implementation such as `ConcurrentSkipListMap` (smile). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328480573 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import java.nio.ByteBuffer; + +/** + * Chunk is a contiguous byteBuffer. or logically contiguous space . + * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M on-heap ByteBuffer + */ +public interface Chunk { + /** +* Try to allocate size bytes from the chunk. spaceSizeInfo will record occupied space size. +* +* @return the offset of the successful allocation, or -1 to indicate not-enough-space +*/ + int allocate(int len); + + /** +* release the space addressed by interChunkOffset. spaceSizeInfo will record occupied space size. +* +* @param interChunkOffset offset of the chunk +*/ + void free(int interChunkOffset); + + /** +* @return Id of this Chunk +*/ + int getChunkId(); + + int getChunkCapacity(); + + /** +* @return This chunk's backing ByteBuffer described by chunkOffset. Review comment: It's always backed by one or multiple `ByteBuffer`. For memory-mapped file it will be one `MappedByteBuffer` and for memory it will be one or multiple `HeapByteBuffer` or `DirectByteBuffer`. Will update the javadoc of the class to clarify these. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328474557 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import java.nio.ByteBuffer; + +/** + * Chunk is a contiguous byteBuffer. or logically contiguous space . + * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M on-heap ByteBuffer + */ +public interface Chunk { + /** +* Try to allocate size bytes from the chunk. spaceSizeInfo will record occupied space size. +* +* @return the offset of the successful allocation, or -1 to indicate not-enough-space Review comment: Ditto. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328474162 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Chunk.java ## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import java.nio.ByteBuffer; + +/** + * Chunk is a contiguous byteBuffer. or logically contiguous space . + * for example: a Chunk is 1G space, maybe it's one big file, or multi 4M on-heap ByteBuffer + */ +public interface Chunk { + /** +* Try to allocate size bytes from the chunk. spaceSizeInfo will record occupied space size. Review comment: Some legacy javadoc comments, should be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328473311 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/space/Allocator.java ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap.space; + +import java.io.Closeable; + +/** + * Implementations are responsible for allocate space. + */ +public interface Allocator extends Closeable { + + /** +* Allocate space with the given size. +* +* @param size size of space to allocate. +* @return address of the allocated space, or -1 when allocation is failed. +*/ + long allocate(int size); Review comment: We will check against `-1`, see `SkipListUtils.NIL_VALUE_POINTER`. We don't throw exception here because the definition of methods in `StateMap` don't throw exceptions, so we will need to handle the exception somewhere if we throw any in lower level functions, then no big difference from handling `-1`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328464527 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328460576 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328460043 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328456850 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328452591 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328435002 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328091422 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r328036088 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r327990096 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r327989026 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r326183003 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r326159393 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r326159038 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325990356 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325987134 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325986596 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325980641 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still us
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325743874 ## File path: flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java ## @@ -0,0 +1,195 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.core.memory; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +/** + * Tests for {@link ByteBufferUtils}. + */ +public class ByteBufferUtilsTest extends TestLogger { + + @Test + public void testDirectBBWriteAndRead() { + testWithDifferentOffset(true); + } + + @Test + public void testHeapBBWriteAndRead() { + testWithDifferentOffset(false); + } + + @Test + public void testCompareDirectBBToArray() { + testCompareTo(true, false, false); + } + + @Test + public void testCompareDirectBBToDirectBB() { + testCompareTo(true, true, true); + } + + @Test + public void testCompareDirectBBToHeapBB() { + testCompareTo(true, true, false); + } + + @Test + public void testCompareHeapBBToArray() { + testCompareTo(false, false, false); + } + + @Test + public void testCompareHeapBBToDirectBB() { + testCompareTo(false, true, true); + } + + @Test + public void testCompareHeapBBToHeapBB() { + testCompareTo(false, true, false); + } + + private void testCompareTo(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + testEquals(isLeftBBDirect, isRightBuffer, isRightDirect); + testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect); + testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect); + } + + private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'}; + byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'}; + ByteBuffer left = isLeftBBDirect + ? ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes) + : ByteBuffer.wrap(leftBufferBytes); + ByteBuffer right = null; + if (isRightBuffer) { + right = isRightDirect + ? ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes) + : ByteBuffer.wrap(rightBufferBytes); + } + if (right != null) { + Assert.assertThat( + ByteBufferUtils.compareTo(left, 1, 4, right, 0, 4), Review comment: Just to verify the `compareTo` method could correctly work with none zero offsets. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325763513 ## File path: flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java ## @@ -0,0 +1,195 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.core.memory; + +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +/** + * Tests for {@link ByteBufferUtils}. + */ +public class ByteBufferUtilsTest extends TestLogger { + + @Test + public void testDirectBBWriteAndRead() { + testWithDifferentOffset(true); + } + + @Test + public void testHeapBBWriteAndRead() { + testWithDifferentOffset(false); + } + + @Test + public void testCompareDirectBBToArray() { + testCompareTo(true, false, false); + } + + @Test + public void testCompareDirectBBToDirectBB() { + testCompareTo(true, true, true); + } + + @Test + public void testCompareDirectBBToHeapBB() { + testCompareTo(true, true, false); + } + + @Test + public void testCompareHeapBBToArray() { + testCompareTo(false, false, false); + } + + @Test + public void testCompareHeapBBToDirectBB() { + testCompareTo(false, true, true); + } + + @Test + public void testCompareHeapBBToHeapBB() { + testCompareTo(false, true, false); + } + + private void testCompareTo(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + testEquals(isLeftBBDirect, isRightBuffer, isRightDirect); + testLessThan(isLeftBBDirect, isRightBuffer, isRightDirect); + testGreaterThan(isLeftBBDirect, isRightBuffer, isRightDirect); + } + + private void testEquals(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'}; + byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'}; + ByteBuffer left = isLeftBBDirect + ? ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes) + : ByteBuffer.wrap(leftBufferBytes); + ByteBuffer right = null; + if (isRightBuffer) { + right = isRightDirect + ? ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes) + : ByteBuffer.wrap(rightBufferBytes); + } + if (right != null) { + Assert.assertThat( + ByteBufferUtils.compareTo(left, 1, 4, right, 0, 4), + is(0)); + } else { + Assert.assertThat( + ByteBufferUtils.compareTo(left, 1, 4, rightBufferBytes, 0, 4), + is(0)); + } + } + + private void testLessThan(boolean isLeftBBDirect, boolean isRightBuffer, boolean isRightDirect) { + byte[] leftBufferBytes = new byte[]{'a', 'b', 'c', 'd', 'e'}; + byte[] rightBufferBytes = new byte[]{'b', 'c', 'd', 'e', 'f'}; + ByteBuffer left = isLeftBBDirect + ? ByteBuffer.allocateDirect(leftBufferBytes.length).put(leftBufferBytes) + : ByteBuffer.wrap(leftBufferBytes); + ByteBuffer right = null; + if (isRightBuffer) { + right = isRightDirect + ? ByteBuffer.allocateDirect(rightBufferBytes.length).put(rightBufferBytes) + : ByteBuffer.wrap(rightBufferBytes); +
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325712863 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java ## @@ -0,0 +1,797 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.core.memory.ByteBufferUtils; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; + +/** + * Utilities for skip list. + */ +@SuppressWarnings("WeakerAccess") +public class SkipListUtils { + static final long NIL_NODE = -1; + static final long HEAD_NODE = -2; + static final long NIL_VALUE_POINTER = -1; + static final int MAX_LEVEL = 255; + static final int DEFAULT_LEVEL = 32; + static final int BYTE_MASK = 0xFF; + + /** +* Key space schema. +* - key meta +* -- int: level & status +* -- byte 0: level of node in skip list +* -- byte 1: node status +* -- byte 2: preserve +* -- byte 3: preserve +* -- int: length of key +* -- long: pointer to the newest value +* -- long: pointer to next node on level 0 +* -- long[]: array of pointers to next node on different levels excluding level 0 +* -- long[]: array of pointers to previous node on different levels excluding level 0 +* - byte[]: data of key +*/ + static final int KEY_META_OFFSET = 0; + static final int KEY_LEN_OFFSET = KEY_META_OFFSET + Integer.BYTES; + static final int VALUE_POINTER_OFFSET = KEY_LEN_OFFSET + Integer.BYTES; + static final int NEXT_KEY_POINTER_OFFSET = VALUE_POINTER_OFFSET + Long.BYTES; + static final int LEVEL_INDEX_OFFSET = NEXT_KEY_POINTER_OFFSET + Long.BYTES; + + + /** +* Pre-compute the offset of index for different levels to dismiss the duplicated +* computation at runtime. +*/ + private static final int[] INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY = new int[MAX_LEVEL + 1]; + + /** +* Pre-compute the length of key meta for different levels to dismiss the duplicated +* computation at runtime. +*/ + private static final int[] KEY_META_LEN_BY_LEVEL_ARRAY = new int[MAX_LEVEL + 1]; + + static { + for (int i = 1; i < INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY.length; i++) { + INDEX_NEXT_OFFSET_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + (i - 1) * Long.BYTES; + } + + for (int i = 0; i < KEY_META_LEN_BY_LEVEL_ARRAY.length; i++) { + KEY_META_LEN_BY_LEVEL_ARRAY[i] = LEVEL_INDEX_OFFSET + 2 * i * Long.BYTES; + } + } + + /** +* Returns the level of the node. +* +* @param byteBuffer byte buffer for key space. +* @param offset offset of key space in the byte buffer. +*/ + public static int getLevel(ByteBuffer byteBuffer, int offset) { + return ByteBufferUtils.toInt(byteBuffer, offset + KEY_META_OFFSET) & BYTE_MASK; + } + + /** +* Returns the status of the node. +* +* @param byteBuffer byte buffer for key space. +* @param offset offset of key space in the byte buffer. +*/ + public static byte getNodeStatus(ByteBuffer byteBuffer, int offset) { Review comment: We thought adding a `valueOf` in `NodeStatus` and do switch of would have a worse performance, but after a JMH check there's no big difference between "firstly valueOf and then comparing the enum" and "pass the byte and compare with getValue", so will change to use the more readable way. This is an automated message from the Apache Git Service
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325650441 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapSnapshot.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.util.ResourceGuard; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * This class represents the snapshot of a {@link CopyOnWriteSkipListStateMap}. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMapSnapshot + extends StateMapSnapshot> { + + /** +* Version of the {@link CopyOnWriteSkipListStateMap} when this snapshot was created. This can be used to release the snapshot. +*/ + private final int snapshotVersion; + + /** The number of (non-null) entries in snapshotData. */ + @Nonnegative + private final int numberOfEntriesInSnapshotData; + + /** +* This lease protects the state map resources. +*/ + private final ResourceGuard.Lease lease; + + /** +* Creates a new {@link CopyOnWriteSkipListStateMap}. +* +* @param owningStateMap the {@link CopyOnWriteSkipListStateMap} for which this object represents a snapshot. +* @param lease the lease protects the state map resources. +*/ + CopyOnWriteSkipListStateMapSnapshot( + CopyOnWriteSkipListStateMap owningStateMap, + ResourceGuard.Lease lease) { + super(owningStateMap); + + this.snapshotVersion = owningStateMap.getStateMapVersion(); + this.numberOfEntriesInSnapshotData = owningStateMap.size(); + this.lease = lease; + } + + /** +* Returns the internal version of the when this snapshot was created. +*/ + int getSnapshotVersion() { + return snapshotVersion; + } + + @Override + public void release() { + owningStateMap.releaseSnapshot(this); + lease.close(); + } + + @Override + public void writeState( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + TypeSerializer stateSerializer, + @Nonnull DataOutputView dov, + @Nullable StateSnapshotTransformer stateSnapshotTransformer) throws IOException { + if (stateSnapshotTransformer == null) { + writeStateWithNoTransform(dov); + } else { + writeStateWithTransform(stateSerializer, dov, stateSnapshotTransformer); + } + } + + private void writeStateWithNoTransform(@Nonnull DataOutputView dov) throws IOException { + dov.writeInt(numberOfEntriesInSnapshotData); + SnapshotNodeIterator nodeIterator = new SnapshotNodeIterator(true); + while (nodeIterator.hasNext()) { + Tuple2 tuple = nodeIterator.next(); + writeKeyAndNamespace(tuple.f0, dov); + writeValue(tuple.f1, dov); + } + } + + private void writeStateWithTransform( + TypeSerializer stateSerializer, + @Nonnull DataOutputVi
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r325646539 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapSnapshot.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.util.ResourceGuard; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * This class represents the snapshot of a {@link CopyOnWriteSkipListStateMap}. + * + * @param type of key + * @param type of namespace + * @param type of state + */ +public class CopyOnWriteSkipListStateMapSnapshot + extends StateMapSnapshot> { + + /** +* Version of the {@link CopyOnWriteSkipListStateMap} when this snapshot was created. This can be used to release the snapshot. +*/ + private final int snapshotVersion; + + /** The number of (non-null) entries in snapshotData. */ + @Nonnegative + private final int numberOfEntriesInSnapshotData; + + /** +* This lease protects the state map resources. +*/ + private final ResourceGuard.Lease lease; + + /** +* Creates a new {@link CopyOnWriteSkipListStateMap}. +* +* @param owningStateMap the {@link CopyOnWriteSkipListStateMap} for which this object represents a snapshot. +* @param lease the lease protects the state map resources. +*/ + CopyOnWriteSkipListStateMapSnapshot( + CopyOnWriteSkipListStateMap owningStateMap, + ResourceGuard.Lease lease) { + super(owningStateMap); + + this.snapshotVersion = owningStateMap.getStateMapVersion(); + this.numberOfEntriesInSnapshotData = owningStateMap.size(); + this.lease = lease; + } + + /** +* Returns the internal version of the when this snapshot was created. +*/ + int getSnapshotVersion() { + return snapshotVersion; + } + + @Override + public void release() { + owningStateMap.releaseSnapshot(this); + lease.close(); + } + + @Override + public void writeState( + TypeSerializer keySerializer, + TypeSerializer namespaceSerializer, + TypeSerializer stateSerializer, + @Nonnull DataOutputView dov, + @Nullable StateSnapshotTransformer stateSnapshotTransformer) throws IOException { + if (stateSnapshotTransformer == null) { + writeStateWithNoTransform(dov); + } else { + writeStateWithTransform(stateSerializer, dov, stateSnapshotTransformer); + } + } + + private void writeStateWithNoTransform(@Nonnull DataOutputView dov) throws IOException { + dov.writeInt(numberOfEntriesInSnapshotData); + SnapshotNodeIterator nodeIterator = new SnapshotNodeIterator(true); + while (nodeIterator.hasNext()) { + Tuple2 tuple = nodeIterator.next(); + writeKeyAndNamespace(tuple.f0, dov); + writeValue(tuple.f1, dov); + } + } + + private void writeStateWithTransform( + TypeSerializer stateSerializer, + @Nonnull DataOutputVi
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r319518588 ## File path: flink-core/src/test/java/org/apache/flink/core/memory/ByteBufferUtilsTest.java ## @@ -0,0 +1,101 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.core.memory; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; + +/** + * Tests for {@link ByteBufferUtils}. + */ +public class ByteBufferUtilsTest { + + @Test + public void testBBWriteAndRead() { + + ByteBuffer bb = ByteBuffer.allocateDirect(4096); + doTest(bb, 0); + doTest(bb, 1); + doTest(bb, 2); + doTest(bb, 3); + doTest(bb, 4); + doTest(bb, 5); + doTest(bb, 6); + doTest(bb, 7); + + bb = ByteBuffer.allocate(4096); + doTest(bb, 0); + doTest(bb, 1); + doTest(bb, 2); + doTest(bb, 3); + doTest(bb, 4); + doTest(bb, 5); + doTest(bb, 6); + doTest(bb, 7); + } + + @Test + public void testCompareHeapBufferWithDirectBuffer() { + byte[] bytes = new byte[]{'a', 'b', 'c', 'd', 'e'}; + final int len = bytes.length; + ByteBuffer heapBuffer = ByteBuffer.wrap(bytes); + ByteBuffer directBuffer = ByteBuffer.allocateDirect(len); + directBuffer.put(bytes); + int res = ByteBufferUtils.compareTo(heapBuffer, 1, len - 1, directBuffer, 1, len - 1); + Assert.assertThat(res, is(0)); + res = ByteBufferUtils.compareTo(heapBuffer, 0, len - 1, directBuffer, 1, len - 1); + Assert.assertThat(res, lessThan(0)); + res = ByteBufferUtils.compareTo(heapBuffer, 1, len - 1, directBuffer, 0, len - 1); + Assert.assertThat(res, greaterThan(0)); + } + + private void doTest(ByteBuffer bb, int offset) { + int positionOri = bb.position(); + + ByteBufferUtils.putInt(bb, offset, 123); + Assert.assertEquals(bb.position(), positionOri); + Assert.assertEquals(123, ByteBufferUtils.toInt(bb, offset)); + Assert.assertEquals(bb.position(), positionOri); + + ByteBufferUtils.putLong(bb, offset + 4, 1234); + Assert.assertEquals(bb.position(), positionOri); + Assert.assertEquals(1234, ByteBufferUtils.toLong(bb, offset + 4)); + Assert.assertEquals(bb.position(), positionOri); + + Assert.assertEquals(123, ByteBufferUtils.toInt(bb, offset)); + Assert.assertEquals(bb.position(), positionOri); + + ByteBuffer bb2 = ByteBuffer.allocate(12); + int positionOri2 = bb2.position(); + ByteBufferUtils.copyFromBufferToBuffer(bb, offset, bb2, 0, 12); + + Assert.assertEquals(ByteBufferUtils.toInt(bb2, 0), 123); + Assert.assertEquals(ByteBufferUtils.toLong(bb2, 4), 1234); + Assert.assertEquals(bb.position(), positionOri); + Assert.assertEquals(bb2.position(), positionOri2); Review comment: This case is designed for testing composite operations, will add some comments to help understand the logic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r319333262 ## File path: flink-core/src/main/java/org/apache/flink/core/memory/UnsafeHelp.java ## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.memory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Unsafe; + +import javax.annotation.Nonnull; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.security.AccessController; +import java.security.PrivilegedAction; + +/** + * Unsafe use help. + */ +public class UnsafeHelp { Review comment: Yes, mainly referred to HBase's [UnsafeAccess](https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java), will add a comment to mention the source. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r319071340 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Un-synchronized input stream using the given byte buffer. + */ +public class ByteBufferInputStreamWithPos extends InputStream { Review comment: Here is the result of JMH benchmark, where "Orig" means the original implementation of `ByteArrayInputStreamWithPos` and "New" means the new one rebased on `ByteBufferInputStreamWithPos`. From the result we could see the initialization will indeed take more time due to constructing `HeapByteBuffer` but there's no regression on `read` functions, so I think the real impact of the refactor is ok. ``` # Run complete. Total time: 00:06:08 Benchmark Mode Cnt Score Error Units ByteArrayInputStreamBenchmark.testInitNew thrpt 30 51998.302 ± 2460.797 ops/ms ByteArrayInputStreamBenchmark.testInitOrigthrpt 30 102891.299 ± 6027.684 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferNew thrpt 30 22985.900 ± 1437.231 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferOrig thrpt 30 24436.653 ± 297.965 ops/ms ByteArrayInputStreamBenchmark.testReadNew thrpt 30 25100.684 ± 1466.822 ops/ms ByteArrayInputStreamBenchmark.testReadOrigthrpt 30 26607.799 ± 413.156 ops/ms # Run complete. Total time: 00:06:09 Benchmark Mode Cnt Score Error Units ByteArrayInputStreamBenchmark.testInitNew thrpt 30 49778.439 ± 2988.952 ops/ms ByteArrayInputStreamBenchmark.testInitOrigthrpt 30 100307.654 ± 16369.951 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferNew thrpt 30 22838.932 ± 1315.758 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferOrig thrpt 30 22266.972 ± 1438.928 ops/ms ByteArrayInputStreamBenchmark.testReadNew thrpt 30 25849.028 ± 1521.414 ops/ms ByteArrayInputStreamBenchmark.testReadOrigthrpt 30 25672.482 ± 1196.857 ops/ms # Run complete. Total time: 00:06:07 Benchmark Mode Cnt Score Error Units ByteArrayInputStreamBenchmark.testInitNew thrpt 30 53944.255 ± 1009.925 ops/ms ByteArrayInputStreamBenchmark.testInitOrigthrpt 30 117808.909 ± 2655.215 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferNew thrpt 30 24388.177 ± 346.684 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferOrig thrpt 30 24491.455 ± 323.040 ops/ms ByteArrayInputStreamBenchmark.testReadNew thrpt 30 27081.540 ± 471.671 ops/ms ByteArrayInputStreamBenchmark.testReadOrigthrpt 30 26694.972 ± 348.197 ops/ms ``` Attached is the patch of the benchmark, based on the current flink-benchmark master branch. Command used to run the benchmark is `mvn -Dflink.version=1.10-SNAPSHOT clean package exec:exec -Dexec.executable=java -Dexec.args="-jar target/benchmarks.jar -rf csv org.apache.flink.benchmark.memory.*" [bytearrayinputstream.benchmark.patch](https://github.com/apache/flink/files/3555750/bytearrayinputstream.benchmark.patch.txt) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us..
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r319071340 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Un-synchronized input stream using the given byte buffer. + */ +public class ByteBufferInputStreamWithPos extends InputStream { Review comment: Here is the result of JMH benchmark, where "Orig" means the original implementation of `ByteArrayInputStreamWithPos` and "New" means the new one rebased on `ByteBufferInputStreamWithPos`. From the result we could see the initialization will indeed take more time due to constructing `HeapByteBuffer` but there's no regression on `read` functions, so I think the real impact of the refactor is ok. ``` # Run complete. Total time: 00:06:08 Benchmark Mode Cnt Score Error Units ByteArrayInputStreamBenchmark.testInitNew thrpt 30 51998.302 ± 2460.797 ops/ms ByteArrayInputStreamBenchmark.testInitOrigthrpt 30 102891.299 ± 6027.684 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferNew thrpt 30 22985.900 ± 1437.231 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferOrig thrpt 30 24436.653 ± 297.965 ops/ms ByteArrayInputStreamBenchmark.testReadNew thrpt 30 25100.684 ± 1466.822 ops/ms ByteArrayInputStreamBenchmark.testReadOrigthrpt 30 26607.799 ± 413.156 ops/ms # Run complete. Total time: 00:06:09 Benchmark Mode Cnt Score Error Units ByteArrayInputStreamBenchmark.testInitNew thrpt 30 49778.439 ± 2988.952 ops/ms ByteArrayInputStreamBenchmark.testInitOrigthrpt 30 100307.654 ± 16369.951 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferNew thrpt 30 22838.932 ± 1315.758 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferOrig thrpt 30 22266.972 ± 1438.928 ops/ms ByteArrayInputStreamBenchmark.testReadNew thrpt 30 25849.028 ± 1521.414 ops/ms ByteArrayInputStreamBenchmark.testReadOrigthrpt 30 25672.482 ± 1196.857 ops/ms # Run complete. Total time: 00:06:07 Benchmark Mode Cnt Score Error Units ByteArrayInputStreamBenchmark.testInitNew thrpt 30 53944.255 ± 1009.925 ops/ms ByteArrayInputStreamBenchmark.testInitOrigthrpt 30 117808.909 ± 2655.215 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferNew thrpt 30 24388.177 ± 346.684 ops/ms ByteArrayInputStreamBenchmark.testReadIntoBufferOrig thrpt 30 24491.455 ± 323.040 ops/ms ByteArrayInputStreamBenchmark.testReadNew thrpt 30 27081.540 ± 471.671 ops/ms ByteArrayInputStreamBenchmark.testReadOrigthrpt 30 26694.972 ± 348.197 ops/ms ``` Attached is the patch of the benchmark, based on the current flink-benchmark master branch. Command used to run the benchmark is `mvn -Dflink.version=1.10-SNAPSHOT clean package exec:exec -Dexec.executable=java -Dexec.args="-jar target/benchmarks.jar -rf csv org.apache.flink.benchmark.memory.*" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r318412154 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still used by unreleased snapshots. +*/ + private final TreeSet snapshotVersions; + + /** +* The size of skip list wh
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r318412223 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still used by unreleased snapshots. +*/ + private final TreeSet snapshotVersions; + + /** +* The size of skip list wh
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r318379606 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still used by unreleased snapshots. +*/ + private final TreeSet snapshotVersions; + + /** +* The size of skip list wh
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r318379352 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still used by unreleased snapshots. +*/ + private final TreeSet snapshotVersions; + + /** +* The size of skip list wh
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r318375048 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still used by unreleased snapshots. +*/ + private final TreeSet snapshotVersions; + + /** +* The size of skip list wh
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r318374178 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Un-synchronized input stream using the given byte buffer. + */ +public class ByteBufferInputStreamWithPos extends InputStream { Review comment: Will move `ByteBufferInputStreamWithPos` into flink-core and rebase `ByteArrayInputStreamWithPos` based on it. I think the performance impact of `ByteBuffer.wrap` should be ok since the main cost is to construct a `HeapByteBuffer` instance, but will use JMH to identify the details. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r317920996 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferUtils.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import javax.annotation.Nonnull; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +/** + * Utilities to get/put data to {@link ByteBuffer}. All methods don't change + * byte buffer's position. + */ +@SuppressWarnings({"WeakerAccess", "unused", "UnusedReturnValue"}) +public class ByteBufferUtils { + + private static final boolean UNSAFE_AVAIL = UnsafeHelp.isAvailable(); + private static final boolean UNSAFE_UNALIGNED = UnsafeHelp.unaligned(); + private static final Field ACCESS_FIELD; + + static { + try { + ACCESS_FIELD = java.nio.Buffer.class.getDeclaredField("address"); + ACCESS_FIELD.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Failed to get address method from java.nio.Buffer", e); + } + } + + /** +* Reads an int value at the given buffer's offset. +* +* @param buffer the given buffer +* @param offset the given buffer's offset +* @return int value at offset +*/ + public static int toInt(ByteBuffer buffer, int offset) { + if (UNSAFE_UNALIGNED) { + return UnsafeHelp.toInt(buffer, offset); + } else { + return buffer.getInt(offset); + } + } + + /** +* Reads a long value at the given buffer's offset. +* +* @param buffer the given buffer +* @param offset the given buffer's offset +* @return long value at offset +*/ + public static long toLong(ByteBuffer buffer, int offset) { + if (UNSAFE_UNALIGNED) { + return UnsafeHelp.toLong(buffer, offset); + } else { + return buffer.getLong(offset); + } + } + + /** +* Reads a short value at the given buffer's offset. +* +* @param buffer the given buffer +* @param offset the given buffer's offset +* @return short value at offset +*/ + public static short toShort(ByteBuffer buffer, int offset) { + if (UNSAFE_UNALIGNED) { + return UnsafeHelp.toShort(buffer, offset); + } else { + return buffer.getShort(offset); + } + } + + public static byte toByte(ByteBuffer buffer, int offset) { + if (UnsafeHelp.isAvailable()) { + return UnsafeHelp.toByte(buffer, offset); + } else { + return buffer.get(offset); + } + } + + public static void putInt(ByteBuffer buffer, int index, int val) { + if (UNSAFE_UNALIGNED) { + UnsafeHelp.putInt(buffer, index, val); + } else { + buffer.putInt(index, val); + } + } + + public static void putLong(ByteBuffer buffer, int index, long val) { + if (UNSAFE_UNALIGNED) { + UnsafeHelp.putLong(buffer, index, val); + } else { + buffer.putLong(index, val); + } + } + + /** +* Copy from one buffer to another from given offset. This will be absolute positional copying and +* won't affect the position of any of the buffers. +* +* @param inthe given buffer to read +* @param out the given buffer of destination +* @param sourceOffset the given buffer's offset of src +* @param destinationOffset the g
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r317895515 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferUtils.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import javax.annotation.Nonnull; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +/** + * Utilities to get/put data to {@link ByteBuffer}. All methods don't change + * byte buffer's position. + */ +@SuppressWarnings({"WeakerAccess", "unused", "UnusedReturnValue"}) +public class ByteBufferUtils { + + private static final boolean UNSAFE_AVAIL = UnsafeHelp.isAvailable(); + private static final boolean UNSAFE_UNALIGNED = UnsafeHelp.unaligned(); + private static final Field ACCESS_FIELD; + + static { + try { + ACCESS_FIELD = java.nio.Buffer.class.getDeclaredField("address"); + ACCESS_FIELD.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new RuntimeException("Failed to get address method from java.nio.Buffer", e); + } + } + + /** +* Reads an int value at the given buffer's offset. +* +* @param buffer the given buffer +* @param offset the given buffer's offset +* @return int value at offset +*/ + public static int toInt(ByteBuffer buffer, int offset) { + if (UNSAFE_UNALIGNED) { + return UnsafeHelp.toInt(buffer, offset); + } else { + return buffer.getInt(offset); + } + } + + /** +* Reads a long value at the given buffer's offset. +* +* @param buffer the given buffer +* @param offset the given buffer's offset +* @return long value at offset +*/ + public static long toLong(ByteBuffer buffer, int offset) { + if (UNSAFE_UNALIGNED) { + return UnsafeHelp.toLong(buffer, offset); + } else { + return buffer.getLong(offset); + } + } + + /** +* Reads a short value at the given buffer's offset. +* +* @param buffer the given buffer +* @param offset the given buffer's offset +* @return short value at offset +*/ + public static short toShort(ByteBuffer buffer, int offset) { + if (UNSAFE_UNALIGNED) { + return UnsafeHelp.toShort(buffer, offset); + } else { + return buffer.getShort(offset); + } + } + + public static byte toByte(ByteBuffer buffer, int offset) { + if (UnsafeHelp.isAvailable()) { + return UnsafeHelp.toByte(buffer, offset); + } else { + return buffer.get(offset); + } + } + + public static void putInt(ByteBuffer buffer, int index, int val) { + if (UNSAFE_UNALIGNED) { + UnsafeHelp.putInt(buffer, index, val); + } else { + buffer.putInt(index, val); + } + } + + public static void putLong(ByteBuffer buffer, int index, long val) { + if (UNSAFE_UNALIGNED) { + UnsafeHelp.putLong(buffer, index, val); + } else { + buffer.putLong(index, val); + } + } + + /** +* Copy from one buffer to another from given offset. This will be absolute positional copying and +* won't affect the position of any of the buffers. +* +* @param inthe given buffer to read +* @param out the given buffer of destination +* @param sourceOffset the given buffer's offset of src +* @param destinationOffset the g
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r317895384 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferUtils.java ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import javax.annotation.Nonnull; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +/** + * Utilities to get/put data to {@link ByteBuffer}. All methods don't change + * byte buffer's position. + */ +@SuppressWarnings({"WeakerAccess", "unused", "UnusedReturnValue"}) Review comment: Will move this class to `org.apache.flink.core.memory` in flink-core, then we don't need the `WeakerAccess` warning suppress anymore. The current "unused" method might be used latter since this is only part of the whole work, will add test case to cover it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r317887398 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/ByteBufferInputStreamWithPos.java ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; Review comment: Agree that we'd better use another package name. How about we rename the package as the last step after all review comments addressed just to convenient the review? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r317013585 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still used by unreleased snapshots. +*/ + private final TreeSet snapshotVersions; + + /** +* The size of skip list wh
[GitHub] [flink] carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend
carp84 commented on a change in pull request #9501: [FLINK-12697] [State Backends] Support on-disk state storage for spill-able heap backend URL: https://github.com/apache/flink/pull/9501#discussion_r317013585 ## File path: flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java ## @@ -0,0 +1,1505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StateEntry; +import org.apache.flink.runtime.state.StateTransformationFunction; +import org.apache.flink.runtime.state.heap.space.Allocator; +import org.apache.flink.runtime.state.heap.space.Chunk; +import org.apache.flink.runtime.state.heap.space.SpaceUtils; +import org.apache.flink.runtime.state.internal.InternalKvState; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ResourceGuard; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.Spliterators; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static org.apache.flink.runtime.state.heap.SkipListUtils.HEAD_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_NODE; +import static org.apache.flink.runtime.state.heap.SkipListUtils.NIL_VALUE_POINTER; + +/** + * Implementation of state map which is based on skip list with copy-on-write support. states will + * be serialized to bytes and stored in the space allocated with the given allocator. + */ +public class CopyOnWriteSkipListStateMap extends StateMap implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(CopyOnWriteSkipListStateMap.class); + + /** +* Default max number of logically-removed keys to delete one time. +*/ + private static final int DEFAULT_MAX_KEYS_TO_DELETE_ONE_TIME = 3; + + /** +* Default ratio of the logically-removed keys to trigger deletion when snapshot. +*/ + private static final float DEFAULT_LOGICAL_REMOVED_KEYS_RATIO = 0.2f; + + /** +* The serializer used to serialize the key and namespace to bytes stored in skip list. +*/ + private final SkipListKeySerializer skipListKeySerializer; + + /** +* The serializer used to serialize the state to bytes stored in skip list. +*/ + private final SkipListValueSerializer skipListValueSerializer; + + /** +* Space allocator. +*/ + private final Allocator spaceAllocator; + + /** +* The level index header. +*/ + private final LevelIndexHeader levelIndexHeader; + + /** +* Seed to generate random index level. +*/ + private int randomSeed; + + /** +* The current version of this map. Used for copy-on-write mechanics. +*/ + private int stateMapVersion; + + /** +* The highest version of this map that is still required by any unreleased snapshot. +*/ + private int highestRequiredSnapshotVersion; + + /** +* Snapshots no more than this version must have been finished, but there may be some +* snapshots more than this version are still running. +*/ + private volatile int highestFinishedSnapshotVersion; + + /** +* Maintains an ordered set of version ids that are still used by unreleased snapshots. +*/ + private final TreeSet snapshotVersions; + + /** +* The size of skip list wh