[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602226 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.IntFunction; + +/** + * This collection is a concurrent append-only list that grows in chunks. + * It's safe to be used by many threads concurrently and has a max capacity of {@link Integer#MAX_VALUE}. + */ +public final class ConcurrentAppendOnlyChunkedList { --- End diff -- Thanks to you guys and one day I will be better in naming things too I swear :D ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602031 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- Is not that easy here, this one is not a common collection (in theory, but we can get an agreement on the contract for sure): it is more an append only list that allows indexed queries like a map. If I remember correctly it is similar to an [hashed array tree](https://en.wikipedia.org/wiki/Hashed_array_tree), where the top-level directory is a double linked list of "folders" (instead of an array, like the original implementation): indeed in the code there are chunkIndex (==key of top level directory) and offset (==key of leaf into a directory). ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601217 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.IntFunction; + +/** + * This collection is a concurrent append-only list that grows in chunks. + * It's safe to be used by many threads concurrently and has a max capacity of {@link Integer#MAX_VALUE}. + */ +public final class ConcurrentAppendOnlyChunkedList { --- End diff -- This is a lot better! ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601169 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r24662 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- @clebertsuconic @michaelandrepearce Good points guys: i have re-implemented the logic in a proper collection (but that won't implement canonical `Collection` types, because it is not canonical at all) ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; --- End diff -- eheh you're right :P, but the code of this collection was coming from a version where `chunkSize` was not a static final constant: in the new version is more clear why I have done it ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246578304 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- if this was a collection, this would almost be throwing illegal arguement. ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; --- End diff -- cough ;) ... for the same comment you left me :P ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246574775 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) {
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570552 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java --- @@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) { return info; } + private void installTXCallback(final Transaction tx, final PagePosition position) { + installTXCallback(tx, position, -1); + } + /** * @param tx * @param position +* @param persistentSize if negative it needs to be calculated on the fly */ - private void installTXCallback(final Transaction tx, final PagePosition position) { + private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); } PageCursorInfo info = getPageInfo(position); PageCache cache = info.getCache(); - long size = 0; if (cache != null) { - size = getPersistentSize(cache.getMessage(position.getMessageNr())); + final long size; + if (persistentSize < 0) { --- End diff -- -1 is used as a reserved value in another point to trigger the cache lookup ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570293 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java --- @@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) { return info; } + private void installTXCallback(final Transaction tx, final PagePosition position) { + installTXCallback(tx, position, -1); + } + /** * @param tx * @param position +* @param persistentSize if negative it needs to be calculated on the fly */ - private void installTXCallback(final Transaction tx, final PagePosition position) { + private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); } PageCursorInfo info = getPageInfo(position); PageCache cache = info.getCache(); - long size = 0; if (cache != null) { - size = getPersistentSize(cache.getMessage(position.getMessageNr())); + final long size; + if (persistentSize < 0) { --- End diff -- surely this is checking for something like if its -1? not just that its negative which would be worrying if so this should be more explicit to just be checking -1, and if anything else thats negative, means illegal argument ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569626 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- I know but in the original version it was handled in that way and it covers 2 case: the collection is null and the collection hasn't enough element ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569523 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568743 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- surely if this occurs, theres some issue.. ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568579 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; --- End diff -- naming of fields is missaligned, some places its called size, some others you call it index, this is hard to follow. ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568552 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) {
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568367 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) {
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567905 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- Makes sense :+1: ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246566042 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246563763 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- @franz1981 Id agree with @clebertsuconic here, a bit like what ive done with priority consumers, i ended up splitting out the collections logic, which has ended up making it cleaner, and easier to reason with. (and as youve marked out on its pr, more testable ;) ) ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246558406 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- Isn't this mixing the collection implementation itself into the LivePageCache? isn't there a way to implement this logic into its own structure? Like PageCache using a generic ChunkArray (a name I just came up here)? I'm a bit concerned on maintaining the business side of this issue (that is the PageCache) with the speedy implementation of a collection. ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
GitHub user franz1981 opened a pull request: https://github.com/apache/activemq-artemis/pull/2494 ARTEMIS-2224 Reduce contention on LivePageCacheImpl It includes: - **lock-free LivePageCache + tests**: LivePageCacheImpl has been reimplemented to be lock-free, multi-producer and multi-consumer in any of its operations. - **Avoid unnecessary page cache queries on ack TX**: PageSubscriptionImpl::ackTx is already performing a counter update using the message persistent size: the size can be reused on PagePosition::setPersistentSize, avoiding to query the page cache just to compute it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/franz1981/activemq-artemis lock-free-paging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2494.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2494 ---