[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602226 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.IntFunction; + +/** + * This collection is a concurrent append-only list that grows in chunks. + * It's safe to be used by many threads concurrently and has a max capacity of {@link Integer#MAX_VALUE}. + */ +public final class ConcurrentAppendOnlyChunkedList { --- End diff -- Thanks to you guys and one day I will be better in naming things too I swear :D ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602031 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- Is not that easy here, this one is not a common collection (in theory, but we can get an agreement on the contract for sure): it is more an append only list that allows indexed queries like a map. If I remember correctly it is similar to an [hashed array tree](https://en.wikipedia.org/wiki/Hashed_array_tree), where the top-level directory is a double linked list of "folders" (instead of an array, like the original implementation): indeed in the code there are chunkIndex (==key of top level directory) and offset (==key of leaf into a directory). ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601217 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.IntFunction; + +/** + * This collection is a concurrent append-only list that grows in chunks. + * It's safe to be used by many threads concurrently and has a max capacity of {@link Integer#MAX_VALUE}. + */ +public final class ConcurrentAppendOnlyChunkedList { --- End diff -- This is a lot better! ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601169 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage messag
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r24662 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- @clebertsuconic @michaelandrepearce Good points guys: i have re-implemented the logic in a proper collection (but that won't implement canonical `Collection` types, because it is not canonical at all) ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; --- End diff -- eheh you're right :P, but the code of this collection was coming from a version where `chunkSize` was not a static final constant: in the new version is more clear why I have done it ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246578304 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- if this was a collection, this would almost be throwing illegal arguement. ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; --- End diff -- cough ;) ... for the same comment you left me :P ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r246575326 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); - return size; - } finally { - if (releaseRequired) { -((NettyReadable) sendBuffer).getByteBuf().release(); - } + // important todo: Error treatment --- End diff -- done! ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246574781 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + retur
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246574775 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) {
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570552 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java --- @@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) { return info; } + private void installTXCallback(final Transaction tx, final PagePosition position) { + installTXCallback(tx, position, -1); + } + /** * @param tx * @param position +* @param persistentSize if negative it needs to be calculated on the fly */ - private void installTXCallback(final Transaction tx, final PagePosition position) { + private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); } PageCursorInfo info = getPageInfo(position); PageCache cache = info.getCache(); - long size = 0; if (cache != null) { - size = getPersistentSize(cache.getMessage(position.getMessageNr())); + final long size; + if (persistentSize < 0) { --- End diff -- -1 is used as a reserved value in another point to trigger the cache lookup ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570293 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java --- @@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition pos) { return info; } + private void installTXCallback(final Transaction tx, final PagePosition position) { + installTXCallback(tx, position, -1); + } + /** * @param tx * @param position +* @param persistentSize if negative it needs to be calculated on the fly */ - private void installTXCallback(final Transaction tx, final PagePosition position) { + private void installTXCallback(final Transaction tx, final PagePosition position, final long persistentSize) { if (position.getRecordID() >= 0) { // It needs to persist, otherwise the cursor will return to the fist page position tx.setContainsPersistent(); } PageCursorInfo info = getPageInfo(position); PageCache cache = info.getCache(); - long size = 0; if (cache != null) { - size = getPersistentSize(cache.getMessage(position.getMessageNr())); + final long size; + if (persistentSize < 0) { --- End diff -- surely this is checking for something like if its -1? not just that its negative which would be worrying if so this should be more explicit to just be checking -1, and if anything else thats negative, means illegal argument ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user asfgit closed the pull request at: https://github.com/apache/activemq-artemis/pull/2483 ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569626 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- I know but in the original version it was handled in that way and it covers 2 case: the collection is null and the collection hasn't enough element ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569523 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage me
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568743 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { --- End diff -- surely if this occurs, theres some issue.. ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568579 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; --- End diff -- naming of fields is missaligned, some places its called size, some others you call it index, this is hard to follow. ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568552 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) {
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568367 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage message) {
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567905 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- Makes sense :+1: ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage me
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246566042 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -48,54 +82,228 @@ public long getPageId() { } @Override - public synchronized int getNumberOfMessages() { - return messages.size(); + public int getNumberOfMessages() { + while (true) { + final long size = producerIndex; + if (size == RESIZING) { +Thread.yield(); +continue; + } + return (int) Math.min(size, Integer.MAX_VALUE); + } } @Override - public synchronized void setMessages(PagedMessage[] messages) { + public void setMessages(PagedMessage[] messages) { // This method shouldn't be called on liveCache, but we will provide the implementation for it anyway for (PagedMessage msg : messages) { addLiveMessage(msg); } } @Override - public synchronized PagedMessage getMessage(int messageNumber) { - if (messageNumber < messages.size()) { - return messages.get(messageNumber); - } else { + public PagedMessage getMessage(int messageNumber) { + if (messageNumber < 0) { return null; } + //it allow to perform less cache invalidations vs producerIndex if there are bursts of appends + long size = lastSeenProducerIndex; + if (messageNumber >= size) { + while ((size = producerIndex) == RESIZING) { +Thread.yield(); + } + //it is a message over the current size? + if (messageNumber >= size) { +return null; + } + //publish it for others consumers + LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size); + } + final AtomicChunk buffer; + final int offset; + if (messageNumber >= chunkSize) { + offset = messageNumber & chunkMask; + //slow path is moved in a separate method + buffer = jump(messageNumber, size); + } else { + offset = messageNumber; + buffer = consumerBuffer; + } + //NOTE: producerIndex is being updated before setting a new value ie on consumer side need to spin until a not null value is set + PagedMessage msg; + while ((msg = buffer.get(offset)) == null) { + Thread.yield(); + } + return msg; + } + + /** +* Implements a lock-free version of the optimization used on {@link java.util.LinkedList#get(int)} to speed up queries +* ie backward search of a node if needed. +*/ + private AtomicChunk jump(final int messageNumber, final long size) { + //fast division by a power of 2 + final int jumps = messageNumber >> chunkSizeLog2; + //size is never allowed to be > Integer.MAX_VALUE + final int lastChunkIndex = (int) size >> chunkSizeLog2; + int requiredJumps = jumps; + AtomicChunk jumpBuffer = null; + boolean jumpForward = true; + int distanceFromLastChunkIndex = lastChunkIndex - jumps; + //it's worth to go backward from lastChunkIndex? + //trying first to check against the value we already have: if it won't worth, won't make sense to load the producerBuffer + if (distanceFromLastChunkIndex < jumps) { + final AtomicChunk producer = producerBuffer; + //producer is a potential moving, always increasing, target ie better to re-check the distance + distanceFromLastChunkIndex = producer.index - jumps; + if (distanceFromLastChunkIndex < jumps) { +//we're saving some jumps ie is fine to go backward from here +jumpBuffer = producer; +requiredJumps = distanceFromLastChunkIndex; +jumpForward = false; + } + } + //start from the consumer buffer only is needed + if (jumpBuffer == null) { + jumpBuffer = consumerBuffer; + } + for (int i = 0; i < requiredJumps; i++) { + //next chunk is always set if below a read producerIndex value + //previous chunk is final and can be safely read + jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev; + } + return jumpBuffer; } @Override - public synchronized boolean isLive() { + public boolean isLive() { return isLive; } @Override - public synchronized void addLiveMessage(PagedMessage message) { + public void addLiveMessage(PagedMessage me
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246563763 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- @franz1981 Id agree with @clebertsuconic here, a bit like what ive done with priority consumers, i ended up splitting out the collections logic, which has ended up making it cleaner, and easier to reason with. (and as youve marked out on its pr, more testable ;) ) ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r246563323 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java --- @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.Queue; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +public class MQTTQueueCleanTest extends MQTTTestSupport { + + private static final ActiveMQServerLogger log = ActiveMQServerLogger.LOGGER; + + @Test + public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception { + Random random = new Random(); + Set clientProviders = new HashSet<>(11); + int repeatCount = 0; + String address = "clean/test"; + String clientId = "sameClientId"; + String queueName = "::sameClientId.clean.test"; + //The abnormal scene does not necessarily occur, repeating 100 times to ensure the recurrence of the abnormality + while (repeatCount < 100) { + repeatCount++; + int subConnectionCount = random.nextInt(50) + 1; + int sC = 0; + try { +//Reconnect at least twice to reproduce the problem +while (sC < subConnectionCount) { + sC++; + MQTTClientProvider clientProvider = getMQTTClientProvider(); + clientProvider.setClientId(clientId); + initializeConnection(clientProvider); + clientProviders.add(clientProvider); + clientProvider.subscribe(address, AT_LEAST_ONCE); +} + } finally { +for (MQTTClientProvider clientProvider : clientProviders) { + clientProvider.disconnect(); +} +clientProviders.clear(); +assertTrue(waitForBindings(server, queueName, false, 0, 0, 1)); --- End diff -- From what I can tell the overridden version of `waitForBindings` isn't necessary. You could just use something like: `assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null));` ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r246562512 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java --- @@ -3604,4 +3605,14 @@ private void deployReloadableConfigFromConfiguration() throws Exception { return externalComponents; } + @Override + public Set queueConsumersQuery(SimpleString queueName) { --- End diff -- This method isn't necessary. You can use the `locateQueue` method and simply invoke `getConsumers` on the returned value. ---
[GitHub] activemq-artemis pull request #2489: ARTEMIS-2220 Fix PageCursorStressTest::...
Github user asfgit closed the pull request at: https://github.com/apache/activemq-artemis/pull/2489 ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user asfgit closed the pull request at: https://github.com/apache/activemq-artemis/pull/2491 ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2494#discussion_r246558406 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java --- @@ -28,15 +28,49 @@ /** * This is the same as PageCache, however this is for the page that's being currently written. */ --- End diff -- Isn't this mixing the collection implementation itself into the LivePageCache? isn't there a way to implement this logic into its own structure? Like PageCache using a generic ChunkArray (a name I just came up here)? I'm a bit concerned on maintaining the business side of this issue (that is the PageCache) with the speedy implementation of a collection. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246558298 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); --- End diff -- currentIterator is ONLY updated by reset, which should ONLY be called by the same threads operating on the ResettableIterator interface. When add or remove of consumer occurs a new iterator is parked into the volatile changedIterator (using an atomic field updater), so the next reset can pick it up. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246557671 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + return (Resett
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246555304 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); + } + return this; + } + + @Override + public boolean add(T t) { + boolean result = consumers.add(t); + if (result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); --- End diff -- see other comment ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246555040 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { --- End diff -- OK, so to do this and keep the cleaner user interface what i can do is make an abstract base which has most of this logic in, and have the more ugly generics. And then make MultiIterator extend that which then keeps its cleaner generics, and like wise MultiResettableIterator. Ill update PR to see if this is good compromise, or if you'd prefer me to revert and simply keep the cast i had originally. ---
[GitHub] activemq-artemis pull request #2492: ARTEMIS-2222 why the position remains u...
Github user asfgit closed the pull request at: https://github.com/apache/activemq-artemis/pull/2492 ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246551345 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); --- End diff -- Tomorrow I will give another look but it doesn't seems correct to me if this code can be run by different threads: currentIterator is not guarded by any atomic operations to be safely published and changedIterator is volatile load 3 times before hitting the case... ---
[GitHub] activemq-artemis pull request #2492: ARTEMIS-2222 why the position remains u...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2492#discussion_r246538095 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java --- @@ -1315,9 +1315,7 @@ private PagedReference moveNext() { } } - if (!ignored) { - position = message.getPosition(); - } + position = message.getPosition(); --- End diff -- I think this made sense at some point, but after a few fixes later it's harmless. I don't think this needs a JIRA as there's no issue I think. I will merge and keep the JIRA, but I will rename the commit from being a question into something more affirmative. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246525755 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); --- End diff -- The compare and set is because the new iterator could be created whilst we are mid resetting. The iterator is single thread usage only. Updating the collection should be thread safe and able to occur in parralell. This is much like iterating copyonwriteareatlist. Iterator works on a snapshot view and this is single thread. Updating the collection itself can be concurrent and occur whilst someone else is iterating ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246523218 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { + + private final Iterator[] iterators; + int index = -1; --- End diff -- Agreed ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246521520 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { --- End diff -- Yeah i had a version like that but it makes using simple MultiIterator ugly to use. Uneededly. And to extend if needed only needs a cast. In case of resetable. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520970 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +/** + * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset. + * It achieves this by going back to the first iterator, and as moves to another iterator it resets it. + * + * @param type of the class of the iterator. + */ +public class MultiResettableIterator extends MultiIterator implements ResettableIterator { --- End diff -- You could but this then makes using MutiIterator ugly as hell with little gain ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520411 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + retur
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520138 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + retur
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520110 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); --- End diff -- So it is a good reason to save using a `compareAndSet` that's not only expensive but give the false illusion that the code is thread-safe ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520121 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + retur
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518747 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { --- End diff -- Nice ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518669 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) { this.filterString = filterString; } + public void setPriority(byte priority) { --- End diff -- Keeping inline with the rest of the fields and general approach. Id rather keep to status quo ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518313 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); --- End diff -- If it fails it means whilst we were reseting and changing reference to the new iterator. a consumer was added or removed and another new iterator was created (as this can occur whilsy we are iterating or resetting.) And means simply whilst this reset updated its iterator, on next reset it needs to switch over its again. We wouldnt want to lazy set to null ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516870 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); --- End diff -- Like any iterator and iterator should only be interacted by one thread at a time. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516479 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); + } + return this; + } + + @Override + public boolean add(T t) { + boolean result = consumers.add(t); + if (result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); + } + return result; + } + + @Override + public boolean remove(T t) { + boolean result = consumers.remove(t); + if (result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); --- End diff -- Agreed. But this is not hot path, and if anything would allow in future for consumers to be concurrently added or removed without the existing bit sync blocks there is today in queueimpl ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515810 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -2409,14 +2410,10 @@ private void deliver() { // Either the iterator is empty or the consumer is busy int noDelivery = 0; - int size = 0; - - int endPos = -1; - int handled = 0; long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT; - + consumers.reset(); --- End diff -- Its fine. We actually only want to reset on a succesful handled nomatch or expired. Or before starting iterating ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515455 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -2497,45 +2484,36 @@ private void deliver() { handled++; - + consumers.reset(); continue; } if (logger.isTraceEnabled()) { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } - // If a group id is set, then this overrides the consumer chosen round-robin - - SimpleString groupID = extractGroupID(ref); - - if (groupID != null) { - groupConsumer = groups.get(groupID); + final SimpleString groupID = extractGroupID(ref); + groupConsumer = getGroupConsumer(groupConsumer, groupID); - if (groupConsumer != null) { - consumer = groupConsumer; - } - } - - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + if (groupConsumer != null) { + consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - deliveriesInTransit.countUp(); - - handledconsumer = consumer; - - removeMessageReference(holder, ref); - if (redistributor == null) { handleMessageGroup(ref, consumer, groupConsumer, groupID); } + deliveriesInTransit.countUp(); + + + removeMessageReference(holder, ref); + handledconsumer = consumer; handled++; + consumers.reset(); --- End diff -- Yes its fine. Would have no negative effect, and actually have same behaviour as old. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246513536 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + retur
[GitHub] activemq-artemis pull request #:
Github user clebertsuconic commented on the pull request: https://github.com/apache/activemq-artemis/commit/0f9bf15788096ad9dc795954174c3b496861932c#commitcomment-31886537 In artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java: In artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java on line 347: @andytaylor I had a lot of headache here. Weeks testing this while sitting close to a production/test environment. I would double check this carefully. This kind of thing is hard to be tested. ---
[GitHub] activemq-artemis pull request #2485: ARTEMIS-2217 ‘MQTTSessionState’ in ...
Github user onlyMIT closed the pull request at: https://github.com/apache/activemq-artemis/pull/2485 ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246416127 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 5); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1); + receiver1.flow(100); + + Map properties2 = new HashMap<>(); + properties2.put(Symbol.getSymbol("priority"), 50); + AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2); + receiver2.flow(100); + + Map properties3 = new HashMap<>(); + properties3.put(Symbol.getSymbol("priority"), 10); + AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3); + receiver3.flow(100); + + sendMessages(getQueueName(), 5); + + + for (int i = 0; i < 5; i++) { + AmqpMessage message1 = receiver1.receiveNoWait(); --- End diff -- I believe the test clients receiveNoWait only polls its local queue, so this might now likely sporadically fail due to racing the deliveries. I was only suggesting recieveNoWait as potential initial verification within the loop for non-receiving consumers, to be followed up or substituted by a more stringent final check. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246398611 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) { filterString = buffer.readNullableSimpleString(); browseOnly = buffer.readBoolean(); requiresResponse = buffer.readBoolean(); + if (buffer.readableBytes() > 0) { --- End diff -- Does this comment cover to the bit around the safety of always sending the new additional data even to old servers? I can't tell if its covered. I think it should at the very least be commented what/when the encoding+decoding handling behaviour changed so folks can understand the implications later without heading to find past commits. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246403272 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java --- @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + + +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Before; +import org.junit.Test; + +public class QueueConsumerPriorityTest extends BasicOpenWireTest { + + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.makeSureCoreQueueExist("QUEUE.A"); + } + @Test + public void testQueueConsumerPriority() throws JMSException, InterruptedException { + connection.start(); + Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(consumerHighPriority); + Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "QUEUE.A"; + ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1"); + MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low); + + ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2"); + MessageConsumer highConsumer = consumerLowPriority.createConsumer(high); + + ActiveMQQueue senderQueue = new ActiveMQQueue(queueName); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + for (int i = 0; i < 1000; i++) { + producer.send(msg); + assertNotNull("null on iteration: " + i, highConsumer.receive(1000)); + } + assertNull(lowConsumer.receive(2000)); --- End diff -- I don't personally think this is a case which warrants keeping poor behaviour 'for consistency' when there are various essentially equivalent checks/assertions the test could do which don't require wasting 2 seconds. Having maybe run it once to ensure it worked, I'd change it. The ActiveMQ 5 test suite is an even better example of a test suite so slow (due to things like this) that folks don't actually want to run it. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246383235 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -113,6 +133,7 @@ public int hashCode() { result = prime * result + (browseOnly ? 1231 : 1237); result = prime * result + ((filterString == null) ? 0 : filterString.hashCode()); result = prime * result + (int) (id ^ (id >>> 32)); + result = prime * result + priority; --- End diff -- I would uses `Integer::hashCode(priority)` that would `agitate` the value ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246382311 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + return (Resett
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246337126 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + return (Resett
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246338675 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java --- @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import org.apache.activemq.artemis.core.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the the different collections used for each priority level. + * + * A supplier is required to provide the underlying collection needed when a new priority level is seen, + * and the end behaviour is that of the underlying collection, e.g. if set add will follow set's add semantics, + * if list, then list semantics. + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware. + */ +public class PriorityCollection extends AbstractCollection { + + private final Supplier> supplier; + private volatile PriorityHolder[] priorityHolders = newPrioritySetArrayInstance(0); + private volatile int size; + + private void setArray(PriorityHolder[] priorityHolders) { + this.priorityHolders = priorityHolders; + } + + private PriorityHolder[] getArray() { + return priorityHolders; + } + + + public PriorityCollection(Supplier> supplier) { + this.supplier = supplier; + } + + @SuppressWarnings("unchecked") + private static PriorityHolder[] newPrioritySetArrayInstance(int length) { + return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + public Set getPriorites() { + PriorityHolder[] snapshot = getArray(); + return Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + Iterator[] iterators = getIterators(); + return new MultiIterator<>(iterators); + } + + private Iterator[] getIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + Iterator[] iterators = newIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = snapshot[i].getValues().iterator(); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static Iterator[] newIteratorArrayInstance(int length) { + return (Iterator[]) Array.newInstance(Iterator.class, length); + } + + public ResettableIterator resettableIterator() { + return new MultiResettableIterator(getResettableIterators()); + } + + private ResettableIterator[] getResettableIterators() { + PriorityHolder[] snapshot = this.getArray(); + int size = snapshot.length; + ResettableIterator[] iterators = newResettableIteratorArrayInstance(size); + for (int i = 0; i < size; i++) { + iterators[i] = new ArrayResettableIterator<>(snapshot[i].getValues().toArray()); + } + return iterators; + } + + @SuppressWarnings("unchecked") + private static ResettableIterator[] newResettableIteratorArrayInstance(int length) { + return (Resett
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377289 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -2409,14 +2410,10 @@ private void deliver() { // Either the iterator is empty or the consumer is busy int noDelivery = 0; - int size = 0; - - int endPos = -1; - int handled = 0; long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT; - + consumers.reset(); --- End diff -- if any exception would be thrown before it is ok that `reset` won't be called? If not, better to wrap the whole logic with ìtry..finally` and `reset` ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246374340 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { --- End diff -- I will just save `changedIterator` once and will perform a logic with a local value instead of volatile load twice ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246313269 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { + + private final Iterator[] iterators; + int index = -1; --- End diff -- `private` given that seems that isn't needed by a child ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323500 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { + + private final Iterator[] iterators; + int index = -1; + + public MultiIterator(Iterator[] iterators) { --- End diff -- `I[]` ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246375651 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); --- End diff -- What happen if the cas will fail? we ends with a reset() that is not nulling `changedIterator`. Given that we are just clearing the `changedIterator` to `null` a `lazySet(this, null) is enough. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323998 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +/** + * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset. + * It achieves this by going back to the first iterator, and as moves to another iterator it resets it. + * + * @param type of the class of the iterator. + */ +public class MultiResettableIterator extends MultiIterator implements ResettableIterator { + + public MultiResettableIterator(ResettableIterator[] iterators) { + super(iterators); + } + + @Override + protected void moveTo(int index) { + super.moveTo(index); + if (index > -1) { + ((ResettableIterator) get(index)).reset(); --- End diff -- this cast could be avoided thanks to the changes on generics on `MultiIterator` ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376376 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); + } + return this; + } + + @Override + public boolean add(T t) { + boolean result = consumers.add(t); + if (result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); + } + return result; + } + + @Override + public boolean remove(T t) { + boolean result = consumers.remove(t); + if (result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); --- End diff -- `lazeSet` is enough for single-writer/single-threaded semantic ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377787 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -2497,45 +2484,36 @@ private void deliver() { handled++; - + consumers.reset(); continue; } if (logger.isTraceEnabled()) { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } - // If a group id is set, then this overrides the consumer chosen round-robin - - SimpleString groupID = extractGroupID(ref); - - if (groupID != null) { - groupConsumer = groups.get(groupID); + final SimpleString groupID = extractGroupID(ref); + groupConsumer = getGroupConsumer(groupConsumer, groupID); - if (groupConsumer != null) { - consumer = groupConsumer; - } - } - - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + if (groupConsumer != null) { + consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - deliveriesInTransit.countUp(); - - handledconsumer = consumer; - - removeMessageReference(holder, ref); - if (redistributor == null) { handleMessageGroup(ref, consumer, groupConsumer, groupID); } + deliveriesInTransit.countUp(); + + + removeMessageReference(holder, ref); + handledconsumer = consumer; handled++; + consumers.reset(); --- End diff -- if `removeMessageReference` would throw any exeption is fine to have `consumers.reset` not called? if not, uses `try...finally` ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246321398 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { --- End diff -- `MultiIterator, T> implements Iterator` It should avoid casting on children while manipulating `iterators` type ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376356 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); + } + return this; + } + + @Override + public boolean add(T t) { + boolean result = consumers.add(t); + if (result) { + changedIteratorFieldUpdater.set(this, consumers.resettableIterator()); --- End diff -- `lazeSet` is enough for single-writer/single-threaded semantic ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323602 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { + + private final Iterator[] iterators; + int index = -1; + + public MultiIterator(Iterator[] iterators) { + this.iterators = iterators; + } + + @Override + public boolean hasNext() { + while (true) { + if (index != -1) { +Iterator currentIterator = get(index); +if (currentIterator.hasNext()) { + return true; +} + } + int next = index + 1; + if (next < iterators.length) { +moveTo(next); + } else { +return false; + } + } + } + + @Override + public T next() { + while (true) { + if (index != -1) { +Iterator currentIterator = get(index); +if (currentIterator.hasNext()) { + return currentIterator.next(); +} + } + int next = index + 1; + if (next < iterators.length) { +moveTo(next); + } else { +return null; + } + } + } + + protected void moveTo(int index) { + this.index = index; + } + + protected Iterator get(int index) { --- End diff -- ```java protected I get(int index) ``` ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246322551 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +import java.util.Iterator; + +/** + * Provides an Iterator that works over multiple underlying iterators. + * + * @param type of the class of the iterator. + */ +public class MultiIterator implements Iterator { + + private final Iterator[] iterators; --- End diff -- `I[]`: comments above ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246344280 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) { this.filterString = filterString; } + public void setPriority(byte priority) { --- End diff -- this modifier is needed? I can't see it to be called by anyone: if not `priority` filed could be declared as `final` ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323811 --- Diff: artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils.collections; + +/** + * Extends MultiIterator, adding the ability if the underlying iterators are resettable, then its self can reset. + * It achieves this by going back to the first iterator, and as moves to another iterator it resets it. + * + * @param type of the class of the iterator. + */ +public class MultiResettableIterator extends MultiIterator implements ResettableIterator { --- End diff -- ```java public class MultiResettableIterator extends MultiIterator, T> implements ResettableIterator { ``` ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376178 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.PriorityAware; +import org.apache.activemq.artemis.utils.collections.PriorityCollection; +import org.apache.activemq.artemis.utils.collections.ResettableIterator; + +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +/** + * This class's purpose is to hold the consumers. + * + * CopyOnWriteArraySet is used as the underlying collection to the PriorityCollection, as it is concurrent safe, + * but also lock less for a read path, which is our HOT path. + * Also it was the underlying collection previously used in QueueImpl, before we abstracted it out to support priority consumers. + * + * There can only be one resettable iterable view, + * A new iterable view is created on modification, this is to keep the read HOT path performent, BUT + * the iterable view changes only after reset so changes in the underlying collection are only seen after a reset, + * + * All other iterators created by iterators() method are not reset-able and are created on delegating iterator(). + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl implements QueueConsumers { + + private final PriorityCollection consumers = new PriorityCollection<>(CopyOnWriteArraySet::new); + private final Collection unmodifiableConsumers = Collections.unmodifiableCollection(consumers); + private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater = AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, ResettableIterator.class, "changedIterator"); + private volatile ResettableIterator changedIterator; + private ResettableIterator currentIterator = consumers.resettableIterator(); + + @Override + public Set getPriorites() { + return consumers.getPriorites(); + } + + @Override + public boolean hasNext() { + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public QueueConsumers reset() { + if (changedIterator != null) { + currentIterator = changedIterator; + changedIteratorFieldUpdater.compareAndSet(this, changedIterator, null); + } else { + currentIterator.reset(); --- End diff -- I suppose that `reset` is safe to be called just by one thread at time, if not, it would be complex because `currentIterator` could be changed indipendently ---
[GitHub] activemq-artemis pull request #1793: ARTEMIS-1498: Openwire internal headers...
Github user RaiSaurabh closed the pull request at: https://github.com/apache/activemq-artemis/pull/1793 ---
[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...
GitHub user franz1981 opened a pull request: https://github.com/apache/activemq-artemis/pull/2494 ARTEMIS-2224 Reduce contention on LivePageCacheImpl It includes: - **lock-free LivePageCache + tests**: LivePageCacheImpl has been reimplemented to be lock-free, multi-producer and multi-consumer in any of its operations. - **Avoid unnecessary page cache queries on ack TX**: PageSubscriptionImpl::ackTx is already performing a counter update using the message persistent size: the size can be reused on PagePosition::setPersistentSize, avoiding to query the page cache just to compute it. You can merge this pull request into a Git repository by running: $ git pull https://github.com/franz1981/activemq-artemis lock-free-paging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2494.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2494 ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r246247094 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- Look good to me! ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140306 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 50); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1); + receiver1.flow(100); + + Map properties2 = new HashMap<>(); + properties2.put(Symbol.getSymbol("priority"), 10); + AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2); + receiver2.flow(100); + + Map properties3 = new HashMap<>(); + properties3.put(Symbol.getSymbol("priority"), 5); + AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3); + receiver3.flow(100); + + sendMessages(getQueueName(), 5); + + + for (int i = 0; i < 5; i++) { + AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS); + assertNotNull("did not receive message first time", message1); + assertEquals("MessageID:" + i, message1.getMessageId()); + message1.accept(); + assertNull("message is not meant to goto lower priority receiver", message2); + assertNull("message is not meant to goto lower priority receiver", message3); + } + + //Close the high priority receiver + receiver1.close(); + + sendMessages(getQueueName(), 5); + + //Check messages now goto next priority receiver + for (int i = 0; i < 5; i++) { + AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS); --- End diff -- changed to receiveNoWait() ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140214 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 50); --- End diff -- changed up the ordering in this test also. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140041 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 50); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1); + receiver1.flow(100); + + Map properties2 = new HashMap<>(); + properties2.put(Symbol.getSymbol("priority"), 10); + AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2); + receiver2.flow(100); + + Map properties3 = new HashMap<>(); + properties3.put(Symbol.getSymbol("priority"), 5); + AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3); + receiver3.flow(100); + + sendMessages(getQueueName(), 5); + + + for (int i = 0; i < 5; i++) { + AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS); --- End diff -- changed to receiveNoWait ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135818 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +/** + * Exclusive Test + */ +public class ConsumerPriorityTest extends JMSTestBase { + + private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.priority.queue"); --- End diff -- now using getName ... again nice nice ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135697 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java --- @@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender, return consumer; } + private int getPriority(Map properties) { + Integer value = properties == null ? null : (Integer) properties.get(PRIORITY); --- End diff -- Have changed if you can recheck to make sure i understood. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135542 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -52,6 +57,7 @@ public String toString() { StringBuffer buff = new StringBuffer(getParentString()); buff.append(", queueName=" + queueName); buff.append(", filterString=" + filterString); + buff.append(", priority=" + priority); --- End diff -- done ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246133832 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +/** + * Exclusive Test + */ +public class ConsumerPriorityTest extends JMSTestBase { + + private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.priority.queue"); --- End diff -- Nice i didnt know about that in the parent class. will change to use this.. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246132135 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java --- @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + + +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Before; +import org.junit.Test; + +public class QueueConsumerPriorityTest extends BasicOpenWireTest { + + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.makeSureCoreQueueExist("QUEUE.A"); + } + @Test + public void testQueueConsumerPriority() throws JMSException, InterruptedException { + connection.start(); + Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(consumerHighPriority); + Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "QUEUE.A"; + ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1"); + MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low); + + ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2"); + MessageConsumer highConsumer = consumerLowPriority.createConsumer(high); + + ActiveMQQueue senderQueue = new ActiveMQQueue(queueName); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + for (int i = 0; i < 1000; i++) { + producer.send(msg); + assertNotNull("null on iteration: " + i, highConsumer.receive(1000)); + } + assertNull(lowConsumer.receive(2000)); --- End diff -- this is the original test from ActiveMQ5 i was trying to keep this test as much un-touched as possible to ensure behavior is the same. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128551 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 50); --- End diff -- This is actually tested on the queueconsumerimpl test. But agree we can do same here ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128143 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) { filterString = buffer.readNullableSimpleString(); browseOnly = buffer.readBoolean(); requiresResponse = buffer.readBoolean(); + if (buffer.readableBytes() > 0) { --- End diff -- This is typical pattern used for adding safely a new field that can be either nullable or defaultable. Used many times over. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r246127864 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -52,6 +57,7 @@ public String toString() { StringBuffer buff = new StringBuffer(getParentString()); buff.append(", queueName=" + queueName); buff.append(", filterString=" + filterString); + buff.append(", priority=" + priority); --- End diff -- Makes sense ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r246123082 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- I pushed an update to address the issue you identified. Thanks! ---
[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...
Github user onlyMIT closed the pull request at: https://github.com/apache/activemq-artemis/pull/2493 ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r246095670 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- @jbertram in the getSessionState method.Only clear stateï¼not call 'clean()' method. In fact, the queue is not cleaned up. I use the code for the âpahoâ test. The first consumer "cleanSession=false", using a different clientID to open a producer to send a message. Close the producer and consumer, use the same clientID and cleanSession = true" to open the second consumer and find that the consumer will consume the legacy message in the queueãSo I suspect that there is a problem with the test code. I am always looking for why my test results will consume the legacy messages in the queue, and your test results will notã After seeing your information, I re-reviewed the code and found that the test code did not have any problems. What is causing my doubts is that because of your change, when cleanSession=true, only the MQTTSessionState is cleaned up, the queue still exists, and the legacy messages in the queue are consumed when resubscribing. Can close [#2493 ](https://github.com/apache/activemq-artemis/pull/2493) . I think you need to review your changeã ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r246075935 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- The expectation of a "setter" method is simply to _set_ a variable and nothing more. The additional logic in the `setClean` method is not intuitive which is why I removed it and put it in the `getSessionState` method. ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r246058995 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); - return size; - } finally { - if (releaseRequired) { -((NettyReadable) sendBuffer).getByteBuf().release(); - } + // important todo: Error treatment --- End diff -- I'm working on it. I'm out today on a meeting... will be done tomorrow (Wed) ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r245974624 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 50); --- End diff -- I'd suggest creating consumers with priorities out of order (e.g highest in middle), so they arent simply registered in sequence, as otherwise a simple failure to round-robin delivery attempts (given every receiver has enough credit to receive all messages) might also lead to the expected result even without any priority handling consideration. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973668 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 50); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1); + receiver1.flow(100); + + Map properties2 = new HashMap<>(); + properties2.put(Symbol.getSymbol("priority"), 10); + AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2); + receiver2.flow(100); + + Map properties3 = new HashMap<>(); + properties3.put(Symbol.getSymbol("priority"), 5); + AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3); + receiver3.flow(100); + + sendMessages(getQueueName(), 5); + + + for (int i = 0; i < 5; i++) { + AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS); --- End diff -- Burning 250ms twice per loop seems excessive. There is a receiveNoWait that could be used for initial verification nothing arrived, and/or a small final timed wait could be done outside the loop afterwards. Alternatively, pullImmediate() would avoid unnecessary waiting. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r245968414 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.jms.client.ActiveMQDestination; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +/** + * Exclusive Test + */ +public class ConsumerPriorityTest extends JMSTestBase { + + private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.priority.queue"); --- End diff -- Rather than hard coding a shared name, using the test name for the queue name is nice as it isolates different tests and makes the relationship clear, sometimes makes it easier to work on issues later with particular tests. There is a test name rule in the parent class, and a getName() method that can be used with it. ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r245955337 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) { filterString = buffer.readNullableSimpleString(); browseOnly = buffer.readBoolean(); requiresResponse = buffer.readBoolean(); + if (buffer.readableBytes() > 0) { --- End diff -- I assume this is to allow for old clients that don't send this value. Would a more specific version check be clearer here for later reference? Related, I'm guessing other changes already made for 2.7.0 have updated the version info since it doesn't look to change here? Also, is the reverse case safe, does an older server failing to read the additional value (seemingly always sent now) have potential to lead to any issues on older servers, i.e how might the buffer continue to be used later if at all? Should the client omit the value for older servers? (Or does the presumed version change prevent the new client working with the old server anyway? I don't know how that stuff is handled, just commenting from reading the diff here). ---
[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973707 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +/** + * Test various behaviors of AMQP receivers with the broker. + */ +public class AmqpReceiverPriorityTest extends AmqpClientTestSupport { + + @Test(timeout = 3) + public void testPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + Map properties1 = new HashMap<>(); + properties1.put(Symbol.getSymbol("priority"), 50); + AmqpReceiver receiver1 = session.createReceiver(getQueueName(), null, false, false, properties1); + receiver1.flow(100); + + Map properties2 = new HashMap<>(); + properties2.put(Symbol.getSymbol("priority"), 10); + AmqpReceiver receiver2 = session.createReceiver(getQueueName(), null, false, false, properties2); + receiver2.flow(100); + + Map properties3 = new HashMap<>(); + properties3.put(Symbol.getSymbol("priority"), 5); + AmqpReceiver receiver3 = session.createReceiver(getQueueName(), null, false, false, properties3); + receiver3.flow(100); + + sendMessages(getQueueName(), 5); + + + for (int i = 0; i < 5; i++) { + AmqpMessage message1 = receiver1.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS); + assertNotNull("did not receive message first time", message1); + assertEquals("MessageID:" + i, message1.getMessageId()); + message1.accept(); + assertNull("message is not meant to goto lower priority receiver", message2); + assertNull("message is not meant to goto lower priority receiver", message3); + } + + //Close the high priority receiver + receiver1.close(); + + sendMessages(getQueueName(), 5); + + //Check messages now goto next priority receiver + for (int i = 0; i < 5; i++) { + AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS); + AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS); --- End diff -- As above. ---