[GitHub] activemq-artemis pull request #2490: V2 196
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2490#discussion_r245972322 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java --- @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + + +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Before; +import org.junit.Test; + +public class QueueConsumerPriorityTest extends BasicOpenWireTest { + + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.makeSureCoreQueueExist("QUEUE.A"); + } + @Test + public void testQueueConsumerPriority() throws JMSException, InterruptedException { + connection.start(); + Session consumerLowPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session consumerHighPriority = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(consumerHighPriority); + Session senderSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "QUEUE.A"; + ActiveMQQueue low = new ActiveMQQueue(queueName + "?consumer.priority=1"); + MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low); + + ActiveMQQueue high = new ActiveMQQueue(queueName + "?consumer.priority=2"); + MessageConsumer highConsumer = consumerLowPriority.createConsumer(high); + + ActiveMQQueue senderQueue = new ActiveMQQueue(queueName); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + for (int i = 0; i < 1000; i++) { + producer.send(msg); + assertNotNull("null on iteration: " + i, highConsumer.receive(1000)); + } + assertNull(lowConsumer.receive(2000)); --- End diff -- Would a receiveNoWait (either in or outside the loop) like the other tests be nicer than burning 2 seconds? Slow tests is a key reason eventually noone wants to runs the tests :) ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r245960681 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java --- @@ -730,22 +793,29 @@ public int deliverMessage(MessageReference messageReference, int deliveryCount, if (preSettle) { // Presettled means the client implicitly accepts any delivery we send it. - sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + try { + sessionSPI.ack(null, brokerConsumer, messageReference.getMessage()); + } catch (Exception e) { + log.debug(e.getMessage(), e); + } delivery.settle(); } else { sender.advance(); } connection.flush(); } finally { -connection.unlock(); +synchronized (creditsLock) { + pending.decrementAndGet(); +} +if (releaseRequired) { + ((NettyReadable) sendBuffer).getByteBuf().release(); +} } + } catch (Exception e) { + log.warn(e.getMessage(), e); - return size; - } finally { - if (releaseRequired) { -((NettyReadable) sendBuffer).getByteBuf().release(); - } + // important todo: Error treatment --- End diff -- Did you look over this? ---
[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...
GitHub user onlyMIT opened a pull request: https://github.com/apache/activemq-artemis/pull/2493 ARTEMIS-2223 when a new consumer is created, no subscription is called. In the 'MQTTTest.testCleanSession()' test method, when a new consumer is created, no subscription is called.Consumers need to subscribe before they consume the news. You can merge this pull request into a Git repository by running: $ git pull https://github.com/onlyMIT/activemq-artemis ARTEMIS-2223 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2493.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2493 commit 2a6d230c4c8cd739090c5da58754a005d62e1a40 Author: onlyMIT Date: 2019-01-08T09:39:37Z ARTEMIS-2223 when a new consumer is created, no subscription is called. In the 'MQTTTest.testCleanSession()' test method, when a new consumer is created, no subscription is called.Consumers need to subscribe before they consume the news. ---
[GitHub] activemq-artemis pull request #2492: ARTEMIS-2222 why the position remains u...
GitHub user CNNJYB opened a pull request: https://github.com/apache/activemq-artemis/pull/2492 ARTEMIS- why the position remains unchanged if ignored is set to true I am a bit confused about this, When CursorIterator:next is called during queue depage, if ignored is set to true, why the position remains unchanged. if (!ignored) { position = message.getPosition(); } For example, the client sends some messages to the topic subscriber ta (this topic has two subscribers ta and tb), every time tb depage continuous PagePositions that ignored are set to true will be traversed again. You can merge this pull request into a Git repository by running: $ git pull https://github.com/CNNJYB/activemq-artemis dev-CursorIterator-moveNext-ignored Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2492.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2492 commit 49f8bcd99640209c825c22e54a32c872850cc000 Author: andytaylor Date: 2018-10-24T10:21:52Z ARTEMIS-2144 - tx begin failure in ra doesn't get cleaned up https://issues.apache.org/jira/browse/ARTEMIS-2144 commit 7ae39f7d7218e7c5a93eaecb8f24af3d6186e6f3 Author: yb <17061955@...> Date: 2019-01-08T09:14:18Z ARTEMIS- why the position remains unchanged if ignored is set to true ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r245879352 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- It is necessary to call the "clean()" method to clean up old session information when creating a connection. If it is not cleaned up, when the cleanSession of the last MQTT consumer is false, and the cleanSession of the connected MQTT consumer is true, the message in the old queue will be consumed, which is actually not allowed. I think this is why calling "clean()" in the "setIsClean(boolean isClean)" method ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r245878622 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- It is necessary to call the "clean()" method to clean up old session information when creating a connection. If it is not cleaned up, when the cleanSession of the last MQTT consumer is false, and the cleanSession of the connected MQTT consumer is true, the message in the old queue will be consumed, which is actually not allowed. I think this is why calling "clean()" in the "setIsClean(boolean isClean)" method ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
GitHub user jbertram opened a pull request: https://github.com/apache/activemq-artemis/pull/2491 ARTEMIS-2217 remove state on clean MQTT session disconnect You can merge this pull request into a Git repository by running: $ git pull https://github.com/jbertram/activemq-artemis ARTEMIS-2217 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2491.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2491 commit a4d0cf9ab42dd5076b78fb47e3425d11c917db53 Author: Justin Bertram Date: 2018-12-29T08:53:04Z ARTEMIS-2217 remove state on clean MQTT session disconnect ---
[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2467#discussion_r245777806 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java --- @@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener readyListener) { @Override public void onRemoteOpen(Connection connection) throws Exception { - lock(); + handler.requireHandler(); try { - try { -initInternal(); - } catch (Exception e) { -log.error("Error init connection", e); - } - if (!validateConnection(connection)) { -connection.close(); - } else { -connection.setContext(AMQPConnectionContext.this); -connection.setContainer(containerId); -connection.setProperties(connectionProperties); - connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); -connection.open(); - } - } finally { - unlock(); + initInternal(); + } catch (Exception e) { + log.error("Error init connection", e); + } + if (!validateConnection(connection)) { + connection.close(); + } else { + connection.setContext(AMQPConnectionContext.this); + connection.setContainer(containerId); + connection.setProperties(connectionProperties); + connection.setOfferedCapabilities(getConnectionCapabilitiesOffered()); + connection.open(); } initialise(); - /* - * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections - * but its here in case we add support for outbound connections. - * */ + /* + * This can be null which is in effect an empty map, also we really don't need to check this for in bound connections + * but its here in case we add support for outbound connections. + * */ if (connection.getRemoteProperties() == null || !connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) { long nextKeepAliveTime = handler.tick(true); if (nextKeepAliveTime != 0 && scheduledPool != null) { -scheduledPool.schedule(new Runnable() { - @Override - public void run() { - Long rescheduleAt = handler.tick(false); - if (rescheduleAt == null) { - // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. - scheduledPool.schedule(this, 10, TimeUnit.MILLISECONDS); - } else if (rescheduleAt != 0) { - scheduledPool.schedule(this, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); - } - } -}, (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); +scheduledPool.schedule(new ScheduleRunnable(), (nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS); } } } + class TickerRunnable implements Runnable { + + final ScheduleRunnable scheduleRunnable; + + TickerRunnable(ScheduleRunnable scheduleRunnable) { + this.scheduleRunnable = scheduleRunnable; + } + + @Override + public void run() { + try { +Long rescheduleAt = handler.tick(false); +if (rescheduleAt == null) { + // this mean tick could not acquire a lock, we will just retry in 10 milliseconds. + scheduledPool.schedule(scheduleRunnable, 10, TimeUnit.MILLISECONDS); +} else if (rescheduleAt != 0) { + scheduledPool.schedule(scheduleRunnable, rescheduleAt - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS); +} + } catch (Exception e) { +log.warn(e.getMessage(), e); + } --- End diff -- I'm removing the catch here. I don't think we need to use specific loggers on generic handlers like this though. it was a generic handler.. but it's being removed. ---
[GitHub] activemq-artemis pull request #2475: ARTEMIS-2144 - tx begin failure in ra d...
Github user asfgit closed the pull request at: https://github.com/apache/activemq-artemis/pull/2475 ---
[GitHub] activemq-artemis pull request #2490: V2 196
GitHub user michaelandrepearce opened a pull request: https://github.com/apache/activemq-artemis/pull/2490 V2 196 @franz1981 an alternative so we don't have to have a copy of CopyOnWriteArrayList, it does mean on add or remove consumer we have to invoke toArray which causes a copy, but this is not on hot path, so i think we should be good, and avoids us having to clone a jvm class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/michaelandrepearce/activemq-artemis V2-196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2490.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2490 commit d731ffe7288cb857fef1b97deff4b7dc18aeb6d7 Author: Michael André Pearce Date: 2018-12-31T13:22:02Z ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement. Add OpenWire JMS Test - taken from ActiveMQ5 Add Core JMS Test Add AMQP Test Add Docs commit b0c775840fc98b5d3f5f3485802de3270c614d9a Author: Michael André Pearce Date: 2019-01-05T09:48:24Z Extract ---
[GitHub] activemq-artemis pull request #2489: ARTEMIS-2220 Fix PageCursorStressTest::...
GitHub user franz1981 opened a pull request: https://github.com/apache/activemq-artemis/pull/2489 ARTEMIS-2220 Fix PageCursorStressTest::testSimpleCursorWithFilter NPE FakeQueue is not correctly setting the queue on its PageSubscription, leading to fail the test due to NPEs when PageSubscription::getQueue is being used. You can merge this pull request into a Git repository by running: $ git pull https://github.com/franz1981/activemq-artemis ARTEMIS-2220 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2489.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2489 commit 32cb5271bb4f61c23c27ed3b7a3cda512e2648fc Author: Francesco Nigro Date: 2019-01-04T22:50:56Z ARTEMIS-2220 Fix PageCursorStressTest::testSimpleCursorWithFilter NPE FakeQueue is not correctly setting the queue on its PageSubscription, leading to fail the test due to NPEs when PageSubscription::getQueue is being used. ---
[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...
Github user wy96f commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2484#discussion_r245252912 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java --- @@ -278,21 +293,26 @@ public boolean isPaging() { lock.readLock().lock(); try { - if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { -return false; - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { -return isFull(); - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { -return isFull(); - } - return paging; + return isPagingDirtyRead(); } finally { lock.readLock().unlock(); } } + @Override + public boolean isPagingDirtyRead() { + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { --- End diff -- > Yes, but we can just volatile load once before checking its value 3 times, on each call of isPagingDirtyRead get it. nice catch :+1: ---
[GitHub] activemq-artemis pull request #2481: ARTEMIS-2213 don't expire critical comp...
Github user wy96f closed the pull request at: https://github.com/apache/activemq-artemis/pull/2481 ---
[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2484#discussion_r245239811 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java --- @@ -278,21 +293,26 @@ public boolean isPaging() { lock.readLock().lock(); try { - if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { -return false; - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { -return isFull(); - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { -return isFull(); - } - return paging; + return isPagingDirtyRead(); } finally { lock.readLock().unlock(); } } + @Override + public boolean isPagingDirtyRead() { + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { --- End diff -- @wy96f what @franz1981 is trying to say, is we can do the volatile read just once, by adding one line e.g. AddressFullMessagePolicy addressFullMessagePolicy = this.addressFullMessagePolicy; if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { return false; } if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { return isFull(); } if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { return isFull(); } return paging; ---
[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable.
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2482#discussion_r245233815 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java --- @@ -120,14 +126,16 @@ public PagedReferenceImpl(final PagePosition position, this.largeMessage = message.getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE; this.transactionID = message.getTransactionID(); this.messageID = message.getMessage().getMessageID(); - + this.durable = message.getMessage().isDurable() ? IS_DURABLE : IS_NOT_DURABLE; + this.deliveryTime = message.getMessage().getScheduledDeliveryTime(); //pre-cache the message size so we don't have to reload the message later if it is GC'd getPersistentSize(); } else { this.largeMessage = UNDEFINED_IS_LARGE_MESSAGE; this.transactionID = -2; this.messageID = -1; this.messageSize = -1; + this.durable = UNDEFINED_IS_DURABLE; --- End diff -- for completeness (its a nit) set deliveryTime to its undefined value here. ---
[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2484#discussion_r245224854 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java --- @@ -278,21 +293,26 @@ public boolean isPaging() { lock.readLock().lock(); try { - if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { -return false; - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { -return isFull(); - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { -return isFull(); - } - return paging; + return isPagingDirtyRead(); } finally { lock.readLock().unlock(); } } + @Override + public boolean isPagingDirtyRead() { + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { --- End diff -- Yes, but we can just volatile load once before checking its value 3 times, on each call of isPagingDirtyRead ---
[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...
Github user wy96f commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2482#discussion_r245195927 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java --- @@ -120,6 +128,8 @@ public PagedReferenceImpl(final PagePosition position, this.largeMessage = message.getMessage().isLargeMessage() ? IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE; this.transactionID = message.getTransactionID(); this.messageID = message.getMessage().getMessageID(); + this.priority = message.getMessage().getPriority(); --- End diff -- deliveryTime can be set in the constructor like transactionID , messageID , etc :) ---
[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...
Github user wy96f commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2484#discussion_r245183888 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java --- @@ -278,21 +293,26 @@ public boolean isPaging() { lock.readLock().lock(); try { - if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { -return false; - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { -return isFull(); - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { -return isFull(); - } - return paging; + return isPagingDirtyRead(); } finally { lock.readLock().unlock(); } } + @Override + public boolean isPagingDirtyRead() { + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { --- End diff -- addressFullMessagePolicy would be changed if address setting is reapplied. so we need to load the value. ---
[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2484#discussion_r245095523 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java --- @@ -1350,7 +1350,7 @@ public synchronized boolean hasNext() { return true; } - if (!pageStore.isPaging()) { + if (!pageStore.isPagingDirtyRead()) { --- End diff -- Concern here is this ins't an async case. Btw i cannot see the change in QueueImpl to use the new paging dirty read. ---
[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2484#discussion_r245094809 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java --- @@ -278,21 +293,26 @@ public boolean isPaging() { lock.readLock().lock(); try { - if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { -return false; - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { -return isFull(); - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { -return isFull(); - } - return paging; + return isPagingDirtyRead(); } finally { lock.readLock().unlock(); } } + @Override + public boolean isPagingDirtyRead() { + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { --- End diff -- nice idea! ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r245090041 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -309,16 +309,17 @@ public void run() { */ @Override protected void performCachedLargeMessageDeletes() { - for (Long largeMsgId : largeMessagesToDelete) { - SequentialFile msg = createFileForLargeMessage(largeMsgId, LargeMessageExtension.DURABLE); + for (LargeServerMessage largeServerMessage : largeMessagesToDelete.values()) { --- End diff -- Usage of LongConcurrentHashMap looks much better. ---
[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2484#discussion_r245048241 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java --- @@ -278,21 +293,26 @@ public boolean isPaging() { lock.readLock().lock(); try { - if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { -return false; - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) { -return isFull(); - } - if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) { -return isFull(); - } - return paging; + return isPagingDirtyRead(); } finally { lock.readLock().unlock(); } } + @Override + public boolean isPagingDirtyRead() { + if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) { --- End diff -- we can read it just once and save it in a local variable, avoiding 3 volatile loads: same can be done on the original version too ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245030347 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); +
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029912 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -25,20 +26,24 @@ private SimpleString filterString; + private int priority; --- End diff -- marking resolved. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029854 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.util.Iterator; + +public interface ResetableIterator extends Iterator { + + /** +* Resets the iterator so you can re-iterate over all elements. +* +* @return itself, this is just for convenience. +*/ + ResetableIterator reset(); --- End diff -- ill mark resolved then. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029745 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -2497,45 +2494,36 @@ private void deliver() { handled++; - + consumers.reset(); continue; } if (logger.isTraceEnabled()) { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } - // If a group id is set, then this overrides the consumer chosen round-robin + final SimpleString groupID = extractGroupID(ref); + groupConsumer = getGroupConsumer(groupConsumer, groupID); - SimpleString groupID = extractGroupID(ref); - - if (groupID != null) { - groupConsumer = groups.get(groupID); - - if (groupConsumer != null) { - consumer = groupConsumer; - } - } - - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + if (groupConsumer != null) { + consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - deliveriesInTransit.countUp(); - - handledconsumer = consumer; - - removeMessageReference(holder, ref); - if (redistributor == null) { handleMessageGroup(ref, consumer, groupConsumer, groupID); } + deliveriesInTransit.countUp(); + + + removeMessageReference(holder, ref); + handledconsumer = consumer; handled++; + consumers.reset(); --- End diff -- resolving as discussed else where ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029528 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); +
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245014300 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.util.Iterator; + +public interface ResetableIterator extends Iterator { + + /** +* Resets the iterator so you can re-iterate over all elements. +* +* @return itself, this is just for convenience. +*/ + ResetableIterator reset(); --- End diff -- Got it, thanks! ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245013090 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); +
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245011975 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.util.Iterator; + +public interface ResetableIterator extends Iterator { + + /** +* Resets the iterator so you can re-iterate over all elements. +* +* @return itself, this is just for convenience. +*/ + ResetableIterator reset(); --- End diff -- -1 we are not closing the iterator, nor would this go in a try resources block., we are simply resetting the iterator so it marks the endpos = startpos, so we continue to round robin, as successfully handled a message. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245012214 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); +
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245010184 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); + int low =
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009418 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeNullableSimpleString(filterString); buffer.writeBoolean(browseOnly); buffer.writeBoolean(requiresResponse); + buffer.writeInt(priority); --- End diff -- we plan to support int. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009864 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java --- @@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender, return consumer; } + private int getPriority(Map properties) { + Integer value = properties == null ? null : (Integer) properties.get(PRIORITY); --- End diff -- Users in AMQP land will expect Integer. Qpid -https://qpid.apache.org/releases/qpid-broker-j-7.0.6/book/Java-Broker-Runtime-Consumers.html#Java-Broker-Runtime-Consumers-Prioirty ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245008661 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -3080,45 +3053,20 @@ private boolean deliverDirect(final MessageReference ref) { return true; } - int startPos = pos; - - int size = consumerList.size(); + consumers.reset(); - while (true) { -ConsumerHolder holder; -if (redistributor == null) { - holder = consumerList.get(pos); -} else { - holder = redistributor; -} + while (consumers.hasNext() || redistributor != null) { --- End diff -- Trying to keep to standard interfaces, this is the standard Iterator methods. Also at this point in the while we do not want to actually get the next, theres some timeouts and other checks needed to be done first, thus next is called a little later on. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009338 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -25,20 +26,24 @@ private SimpleString filterString; + private int priority; --- End diff -- We should support int, as AMQP uses can use -2^31 to 2^31-1. This changes nothing on the Message size, or any space concerns, this is Only on consumer creation. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245008000 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -2497,45 +2494,36 @@ private void deliver() { handled++; - + consumers.reset(); continue; } if (logger.isTraceEnabled()) { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } - // If a group id is set, then this overrides the consumer chosen round-robin + final SimpleString groupID = extractGroupID(ref); + groupConsumer = getGroupConsumer(groupConsumer, groupID); - SimpleString groupID = extractGroupID(ref); - - if (groupID != null) { - groupConsumer = groups.get(groupID); - - if (groupConsumer != null) { - consumer = groupConsumer; - } - } - - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + if (groupConsumer != null) { + consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - deliveriesInTransit.countUp(); - - handledconsumer = consumer; - - removeMessageReference(holder, ref); - if (redistributor == null) { handleMessageGroup(ref, consumer, groupConsumer, groupID); } + deliveriesInTransit.countUp(); + + + removeMessageReference(holder, ref); + handledconsumer = consumer; handled++; + consumers.reset(); --- End diff -- That is not this intent, the intent in reset is just to move the iterator markers, its not a resources its purpose. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245007330 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); +
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245007052 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); +
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006670 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); +
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006519 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.util.Collection; +import java.util.Set; + +public interface QueueConsumers extends Collection { + + Set getPriorites(); --- End diff -- Priority is of Integer type, not byte. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006326 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java --- @@ -21,7 +21,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -public interface Consumer { +public interface Consumer extends PriorityAware { --- End diff -- Correct, but to make testing of the QueueConsumersImpl logic easier and quite self contained this, its kept seperate, also it means if we wanted to reuse the new QueueConsumerImpl for other PriorityAware needs it makes it easier. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244987340 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java --- @@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender, return consumer; } + private int getPriority(Map properties) { + Integer value = properties == null ? null : (Integer) properties.get(PRIORITY); --- End diff -- I would use a cast to `Number` and call `Number::byteValue` ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244986539 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) { buffer.writeNullableSimpleString(filterString); buffer.writeBoolean(browseOnly); buffer.writeBoolean(requiresResponse); + buffer.writeInt(priority); --- End diff -- I will write it as a byte, if we don't plan to support more then 127 priorities, consumer-side: but as I've said is a negligible save of space on the wire ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244986350 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java --- @@ -25,20 +26,24 @@ private SimpleString filterString; + private int priority; --- End diff -- I will change it as `byte` if we are not planning to support priorities > 127 ie `Byte::MAX_VALUE` For small messages it *could* save some space (TBH to be verified with JOL tool) ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244985399 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.util.Iterator; + +public interface ResetableIterator extends Iterator { + + /** +* Resets the iterator so you can re-iterate over all elements. +* +* @return itself, this is just for convenience. +*/ + ResetableIterator reset(); --- End diff -- We can extends `AutoCloseable` too and override `close` in order to call `reset` by default too ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244984864 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -3080,45 +3053,20 @@ private boolean deliverDirect(final MessageReference ref) { return true; } - int startPos = pos; - - int size = consumerList.size(); + consumers.reset(); - while (true) { -ConsumerHolder holder; -if (redistributor == null) { - holder = consumerList.get(pos); -} else { - holder = redistributor; -} + while (consumers.hasNext() || redistributor != null) { --- End diff -- Just thinking loud: given that `consumers::hasNext` is mostly used with a `next` after it, why not provide just a `pollNext` method that return the consumer or `null` if there isn't any? ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244984274 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java --- @@ -2497,45 +2494,36 @@ private void deliver() { handled++; - + consumers.reset(); continue; } if (logger.isTraceEnabled()) { logger.trace("Queue " + this.getName() + " is delivering reference " + ref); } - // If a group id is set, then this overrides the consumer chosen round-robin + final SimpleString groupID = extractGroupID(ref); + groupConsumer = getGroupConsumer(groupConsumer, groupID); - SimpleString groupID = extractGroupID(ref); - - if (groupID != null) { - groupConsumer = groups.get(groupID); - - if (groupConsumer != null) { - consumer = groupConsumer; - } - } - - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + if (groupConsumer != null) { + consumer = groupConsumer; } HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { - deliveriesInTransit.countUp(); - - handledconsumer = consumer; - - removeMessageReference(holder, ref); - if (redistributor == null) { handleMessageGroup(ref, consumer, groupConsumer, groupID); } + deliveriesInTransit.countUp(); + + + removeMessageReference(holder, ref); + handledconsumer = consumer; handled++; + consumers.reset(); --- End diff -- will try to put the `consumers.reset` into a `try..finally` block or wrapping the reset call into an `AutoCloseable::close` method to force a `try-with-resources` usage too ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244982460 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); + int low =
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244980096 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); + int low =
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244979227 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); + int low =
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244979032 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java --- @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.lang.reflect.Array; +import java.util.AbstractCollection; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking. + * + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure, + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here. + * + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock. + * + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations. + * + * There can only be one resettable iterable view, this is exposed at the top getPriority, + * and is intended for use in QueueImpl only. + * All other iterators are not reset-able and are created on calling iterator(). + * + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these. + * + * @param The type this class may hold, this is generic as can be anything that extends PriorityAware, + * but intent is this is the QueueImpl:ConsumerHolder. + */ +public class QueueConsumersImpl extends AbstractCollection implements QueueConsumers { + + private final QueueConsumersIterator iterator = new QueueConsumersIterator<>(this, true); + + private volatile Level[] levels; + private volatile int size; + private volatile T first; + + private void setArray(Level[] array) { + this.levels = array; + } + + private Level[] getArray() { + return levels; + } + + + public QueueConsumersImpl() { + levels = newLevelArrayInstance(0); + } + + @SuppressWarnings("unchecked") + private static Level[] newLevelArrayInstance(int length) { + return (Level[]) Array.newInstance(Level.class, length); + } + + @Override + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size() == 0; + } + + @Override + public Set getPriorites() { + Level[] levels = getArray(); + return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet()); + } + + @Override + public Iterator iterator() { + return new QueueConsumersIterator<>(this, false); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + return iterator.next(); + } + + @Override + public QueueConsumers reset() { + iterator.reset(); + return this; + } + + @Override + public void forEach(Consumer action) { + Objects.requireNonNull(action); + Level[] current = getArray(); + int len = current.length; + for (int i = 0; i < len; ++i) { + current[i].forEach(action); + } + } + + private Level getLevel(int level, boolean createIfMissing) { + Level[] current = getArray(); + int low =
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244978399 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import org.apache.activemq.artemis.core.server.PriorityAware; + +import java.util.Collection; +import java.util.Set; + +public interface QueueConsumers extends Collection { + + Set getPriorites(); --- End diff -- I know that's just a test method, so feel free to ignore me, but I would just use a byte[] here :P ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244977454 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -727,7 +725,7 @@ private void checkAndCreateDir(final File dir, final boolean create) { List idList = new ArrayList<>(); for (String filename : filenames) { Long id = getLargeMessageIdFromFilename(filename); --- End diff -- you can just use a primitive `long` here ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2488#discussion_r244976790 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java --- @@ -21,7 +21,7 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -public interface Consumer { +public interface Consumer extends PriorityAware { --- End diff -- Just a design q: why using a specific `PriorityAware` interface? I'm assuming that we can't have anymore a `Consumer` that doesn't provide a `default int getPriority()` ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user CNNJYB commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244976446 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -309,16 +309,17 @@ public void run() { */ @Override protected void performCachedLargeMessageDeletes() { - for (Long largeMsgId : largeMessagesToDelete) { - SequentialFile msg = createFileForLargeMessage(largeMsgId, LargeMessageExtension.DURABLE); + for (LargeServerMessage largeServerMessage : largeMessagesToDelete.values()) { --- End diff -- @michaelandrepearce update, please review, Thanks. ---
[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority
GitHub user michaelandrepearce opened a pull request: https://github.com/apache/activemq-artemis/pull/2488 ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement. Add OpenWire JMS Test - taken from ActiveMQ5 Add Core JMS Test Add AMQP Test Add Docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/michaelandrepearce/activemq-artemis ARTEMIS-196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2488 commit 61a91701f3d424d31a83d9942f7786c90ac81559 Author: Michael André Pearce Date: 2018-12-31T13:22:02Z ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement. Add OpenWire JMS Test - taken from ActiveMQ5 Add Core JMS Test Add AMQP Test Add Docs ---
[GitHub] activemq-artemis pull request #2487: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce closed the pull request at: https://github.com/apache/activemq-artemis/pull/2487 ---
[GitHub] activemq-artemis pull request #2487: ARTEMIS-196 Implement Consumer Priority
GitHub user michaelandrepearce opened a pull request: https://github.com/apache/activemq-artemis/pull/2487 ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in queueimpl to its own logical class, to be able to implement. Add OpenWire JMS Test - taken from ActiveMQ5 Add Core JMS Test Add AMQP Test Add Docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/michaelandrepearce/activemq-artemis ARTEMIS-196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2487.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2487 commit 3e4bc71796b202d94ca790f448d9ab15ec073208 Author: Michael André Pearce Date: 2018-12-31T13:22:02Z ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in queueimpl to its own logical class, to be able to implement. Add OpenWire JMS Test - taken from ActiveMQ5 Add Core JMS Test Add AMQP Test Add Docs ---
[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce closed the pull request at: https://github.com/apache/activemq-artemis/pull/2486 ---
[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority
Github user michaelandrepearce closed the pull request at: https://github.com/apache/activemq-artemis/pull/2486 ---
[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority
GitHub user michaelandrepearce reopened a pull request: https://github.com/apache/activemq-artemis/pull/2486 ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in queueimpl to its own logical class, to be able to implement. Add OpenWire JMS Test Add Core JMS Test Add AMQP Test Add Docs You can merge this pull request into a Git repository by running: $ git pull https://github.com/michaelandrepearce/activemq-artemis ARTEMIS-196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2486.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2486 commit 3e4bc71796b202d94ca790f448d9ab15ec073208 Author: Michael André Pearce Date: 2018-12-31T13:22:02Z ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in queueimpl to its own logical class, to be able to implement. Add OpenWire JMS Test - taken from ActiveMQ5 Add Core JMS Test Add AMQP Test Add Docs ---
[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority
GitHub user michaelandrepearce opened a pull request: https://github.com/apache/activemq-artemis/pull/2486 ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in queueimpl to its own logical class, to be able to implement. Add OpenWire Test Add Core Test You can merge this pull request into a Git repository by running: $ git pull https://github.com/michaelandrepearce/activemq-artemis ARTEMIS-196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2486.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2486 commit 0ed69c452fee817b735351db5dc450af246a27fa Author: Michael André Pearce Date: 2018-12-31T13:22:02Z ARTEMIS-196 Implement Consumer Priority Add consumer priority support Includes refactor of consumer iterating in queueimpl to its own logical class, to be able to implement. Add OpenWire Test Add Core Test ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244757759 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -309,16 +309,17 @@ public void run() { */ @Override protected void performCachedLargeMessageDeletes() { - for (Long largeMsgId : largeMessagesToDelete) { - SequentialFile msg = createFileForLargeMessage(largeMsgId, LargeMessageExtension.DURABLE); + for (LargeServerMessage largeServerMessage : largeMessagesToDelete.values()) { --- End diff -- If you wish the collection itself to be iterable, then please add this functionality to LongConcurrentHashMap implementation, it shouldn;t be too hard, as already it has a forEach method ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244756852 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -309,16 +309,17 @@ public void run() { */ @Override protected void performCachedLargeMessageDeletes() { - for (Long largeMsgId : largeMessagesToDelete) { - SequentialFile msg = createFileForLargeMessage(largeMsgId, LargeMessageExtension.DURABLE); + for (LargeServerMessage largeServerMessage : largeMessagesToDelete.values()) { --- End diff -- Calling values actually creates a new List, if you're iterating the objects, simply call using forEach method on the collection. ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244686021 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java --- @@ -193,7 +193,7 @@ public static JournalContent getType(byte type) { protected final Map mapPersistedAddressSettings = new ConcurrentHashMap<>(); - protected final Set largeMessagesToDelete = new HashSet<>(); + protected final Map largeMessagesToDelete = new HashMap<>(); --- End diff -- @CNNJYB we have our own org.apache.activemq.artemis.utils.collections.LongConcurrentHashMap, this allows it to be concurrent safe, and also means it can be a primitive long. ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244685607 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -461,8 +462,7 @@ void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws try { if (isReplicated() && replicator.isSynchronizing()) { synchronized (largeMessagesToDelete) { --- End diff -- @franz1981 looking at the code, looks like its just vanilla HM protected final Map largeMessagesToDelete = new HashMap<>(); ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user CNNJYB commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244657469 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java --- @@ -193,7 +193,7 @@ public static JournalContent getType(byte type) { protected final Map mapPersistedAddressSettings = new ConcurrentHashMap<>(); - protected final Set largeMessagesToDelete = new HashSet<>(); + protected final Map largeMessagesToDelete = new ConcurrentHashMap<>(); --- End diff -- @franz1981 modified, please review, Thanks. ---
[GitHub] activemq-artemis pull request #2485: ARTEMIS-2217 ‘MQTTSessionState’ in ...
GitHub user onlyMIT opened a pull request: https://github.com/apache/activemq-artemis/pull/2485 ARTEMIS-2217 âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ n⦠â¦ever be removed âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ should be removed when the conusmer ï¼cleanSession is trueï¼ connection is closed You can merge this pull request into a Git repository by running: $ git pull https://github.com/onlyMIT/activemq-artemis ARTEMIS-2217 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2485.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2485 commit c4c951c3a2cf74f211d9b7c17cad48f27b725ff5 Author: onlyMIT Date: 2018-12-29T08:53:04Z ARTEMIS-2217 âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ never be removed âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ should be removed when the conusmer ï¼cleanSession is trueï¼ connection is closed ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244470771 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -461,8 +462,7 @@ void deleteLargeMessageFile(final LargeServerMessage largeServerMessage) throws try { if (isReplicated() && replicator.isSynchronizing()) { synchronized (largeMessagesToDelete) { --- End diff -- If it's now using CHM there is no need to sync on it ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2483#discussion_r244470859 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java --- @@ -193,7 +193,7 @@ public static JournalContent getType(byte type) { protected final Map mapPersistedAddressSettings = new ConcurrentHashMap<>(); - protected final Set largeMessagesToDelete = new HashSet<>(); + protected final Map largeMessagesToDelete = new ConcurrentHashMap<>(); --- End diff -- It is possible to use a primitive version of the map ie using primitive longs instead of boxed types ---
[GitHub] activemq-artemis pull request #2484: Use a specific executor for pageSyncTim...
GitHub user qihongxu opened a pull request: https://github.com/apache/activemq-artemis/pull/2484 Use a specific executor for pageSyncTimer Improve paging throughput by using a specific executor for pageSyncTimer Improving throughput on paging mode is one of our concerns since our cluster uses paging a lot. We found that pageSyncTimer in PagingStoreImpl shared the same executor with pageCursorProvider from thread pool. In heavy load scenario like hundreds of consumers receiving messages simultaneously, it became difficult for pageSyncTimer to get the executor due to race condition. Therefore page sync was delayed and producers suffered low throughput. To achieve higher performance we assign a specific executor to pageSyncTimer to avoid racing. And we run a small-scale test on a single modified broker. Broker: 4C/8G/500G SSD Producer: 200 threads, non-transactional send Consumer 200 threads, transactional receive Message text size: 100-200 bytes randomly AddressFullPolicy: PAGE Test resultï¼ Â | Only Send TPS | Only Receive TPS | Send TPS -- | -- | -- | -- Original ver | 38k | 33k | 3k/30k Modified ver | 38k | 34k | 30k/12.5k The chart above shows that on modified broker send TPS improves from âpoorâ to âextremely fastâ, while receive TPS drops from âextremely fastâ to ânot-badâ under heavy load. Considering consumer systems usually have a long processing chain after receiving messages, we donât need too fast receive TPS. Instead, we want to guarantee send TPS to cope with traffic peak and lower producerâs delay time. Moreover, send and receive TPS in total raises from 33k to about 43k. From all above this trade-off seems beneficial and acceptable. You can merge this pull request into a Git repository by running: $ git pull https://github.com/qihongxu/activemq-artemis pageSyncTimer_executor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2484.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2484 commit 01a09f2f2bee98643df06a4eb93588047fea6527 Author: Qihong Xu Date: 2018-12-28T11:59:41Z Use a specific executor for pageSyncTimer ---
[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...
GitHub user CNNJYB opened a pull request: https://github.com/apache/activemq-artemis/pull/2483 ARTEMIS-2215 largemessage have been consumed but not deleted During the backup and live synchronization, the client consumes the largemessage, then the live crash(the performCachedLargeMessageDeletes method is not executed), after the live startup, the largemessages that have been consumed are not deleted from the disk. You can merge this pull request into a Git repository by running: $ git pull https://github.com/CNNJYB/activemq-artemis dev-largemessagenotdelete Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2483.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2483 commit bf28c751af152956d1a12aa2237502c0a14fc4e8 Author: yb <17061955@...> Date: 2018-12-29T08:09:48Z ARTEMIS-2215 largemessage have been consumed but not deleted from the disk during backup and live sync ---
[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2482#discussion_r244150563 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java --- @@ -74,6 +74,10 @@ private long messageSize = -1; + private byte priority; + + private boolean durable; --- End diff -- need to default this not set somehow, possible use a byte to represent the boolean with 0 = false, 1 = true, -1 not set ---
[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2482#discussion_r244150155 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java --- @@ -363,4 +371,20 @@ public long getPersistentSize() { return messageSize; } + @Override + public byte getPriority() { + if (priority == -1) { + priority = getMessage().getPriority(); + } + return priority; + } + + @Override + public boolean isDurable() { + if (messageID < 0) { --- End diff -- should use durable field or additional flag to see if its been cached, messageID maybe cached already but not durable. ---
[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2482#discussion_r244149886 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java --- @@ -74,6 +74,10 @@ private long messageSize = -1; + private byte priority; --- End diff -- default this to -1 ---
[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...
GitHub user qihongxu opened a pull request: https://github.com/apache/activemq-artemis/pull/2482 ARTEMIS-2214 Cache durable in PagedReference We recently performed a test on artemis broker and found a severe performance issue. When paged messages are being consumed, decrementMetrics in QueuePendingMessageMetrics will try to âgetMessageâ to check whether they are durable or not. In this way queue will be locked for a long time because page may be GCed and need to be reload entirely. Other operations rely on queue will be blocked at this time, which cause a significant TPS drop. Detailed stacks are attached below. This also happens when consumer is closed and messages are pushed back to the queue, artemis will check priority on return if these messages are paged. To solve the issue, durable and priority need to be cached in PagedReference just like messageID, transactionID and so on. I have applied a patch to fix the issue. Any review is appreciated. You can merge this pull request into a Git repository by running: $ git pull https://github.com/qihongxu/activemq-artemis modify_pagedReference Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2482.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2482 commit a49ad880c2372afdb88bd805fb6e20fdae1de784 Author: Qihong Xu Date: 2018-12-26T03:11:10Z ARTEMIS-2214 Cache durable in PagedReference ---
[GitHub] activemq-artemis pull request #2481: ARTEMIS-2213 don't expire critical comp...
GitHub user wy96f opened a pull request: https://github.com/apache/activemq-artemis/pull/2481 ARTEMIS-2213 don't expire critical component in the case of clock back drift In our production cluster some brokers crashed. There is nothing unusual in the dump stack. After digging into code, we found component was incorrectly expired. When clock drifted back, left time was less than enter time. If the component was not entered in default 12ms, it would be expired and server was halted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wy96f/activemq-artemis incorrectly_expire_criticalcomponent Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2481 commit ca7cce59592a856dcf8438ff1fec7f7ae18d9e09 Author: yang wei Date: 2018-12-25T07:18:52Z ARTEMIS-2213 don't expire critical component in the case of clock back drift ---
[GitHub] activemq-artemis pull request #2478: ARTEMIS-2210 PagingStore creation is no...
Github user gaohoward commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2478#discussion_r243866712 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java --- @@ -335,19 +335,25 @@ public void deletePageStore(final SimpleString storeName) throws Exception { } /** -* stores is a ConcurrentHashMap, so we don't need to synchronize this method +* This method creates a new store if not exist. */ @Override public PagingStore getPageStore(final SimpleString storeName) throws Exception { if (managementAddress != null && storeName.startsWith(managementAddress)) { return null; } - PagingStore store = stores.get(storeName); - if (store != null) { - return store; + try { --- End diff -- ok got it. I've updated the branch. Please take a look again. thx. ---
[GitHub] activemq-artemis pull request #2480: ARTEMIS-2212 Avoid using CLQ on ServerC...
GitHub user franz1981 opened a pull request: https://github.com/apache/activemq-artemis/pull/2480 ARTEMIS-2212 Avoid using CLQ on ServerConsumerImpl It would deliver a better performance for the most common operations eg offer, poll, iterations, size. You can merge this pull request into a Git repository by running: $ git pull https://github.com/franz1981/activemq-artemis array_q_vs_clq Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2480 commit b5cbf969225f8d72592d7da65e60b999e5ac6882 Author: Francesco Nigro Date: 2018-12-12T16:47:33Z ARTEMIS-2212 Avoid using CLQ on ServerConsumerImpl It would deliver a better performance for the most common operations eg offer, poll, iterations, size. ---
[GitHub] activemq-artemis pull request #2478: ARTEMIS-2210 PagingStore creation is no...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2478#discussion_r243843391 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java --- @@ -335,19 +335,25 @@ public void deletePageStore(final SimpleString storeName) throws Exception { } /** -* stores is a ConcurrentHashMap, so we don't need to synchronize this method +* This method creates a new store if not exist. */ @Override public PagingStore getPageStore(final SimpleString storeName) throws Exception { if (managementAddress != null && storeName.startsWith(managementAddress)) { return null; } - PagingStore store = stores.get(storeName); - if (store != null) { - return store; + try { --- End diff -- I think that `stores.get` is better that will be used in the fast path as it was before: `chm:.get` is completly lock-free, while `chm::computeIfAbsent` will lock on segment level ie better to fallback to `chm::computeIfAbsent` only if `chm::get` return `null` ---
[GitHub] activemq-artemis pull request #2479: ARTEMIS-2211 Avoid duplicate code for B...
GitHub user franz1981 opened a pull request: https://github.com/apache/activemq-artemis/pull/2479 ARTEMIS-2211 Avoid duplicate code for ByteBuffer pooling and alignment Refactored thread local ByteBuffer pooling for both NIO & MAPPED seq file factories and used fast branchless alignment operation for ASYNCIO seq file factory, reusing an util class that was used just for the MAPPED journal. You can merge this pull request into a Git repository by running: $ git pull https://github.com/franz1981/activemq-artemis fast_pow_2mod Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2479.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2479 commit 541172fd0f45540193a95e708e914abd3adc9cb9 Author: Francesco Nigro Date: 2018-12-20T10:11:36Z ARTEMIS-2211 Avoid duplicate code for ByteBuffer pooling and alignment Refactored thread local ByteBuffer pooling for both NIO & MAPPED seq file factories and used fast branchless alignment operation for ASYNCIO seq file factory, reusing an util class that was used just for the MAPPED journal. ---
[GitHub] activemq-artemis pull request #:
Github user franz1981 commented on the pull request: https://github.com/apache/activemq-artemis/commit/dfa70680fed37d25aa3a6d0d6a0795e580495b6a#commitcomment-31769915 No need to synchronize it: `stores` is a concurrent hashmap and you can use the atomic `computeIfAbsent` that use a lambda to populate the map ---
[GitHub] activemq-artemis pull request #2478: ARTEMIS-2210 PagingStore creation is no...
GitHub user gaohoward opened a pull request: https://github.com/apache/activemq-artemis/pull/2478 ARTEMIS-2210 PagingStore creation is not properly synchronized In PagingManagerImpl#getPageStore() the operations on the map 'stores' are not synchronzed and it's possible that more than one paging store is created for one address. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gaohoward/activemq-artemis a_2210 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2478.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2478 commit dfa70680fed37d25aa3a6d0d6a0795e580495b6a Author: Howard Gao Date: 2018-12-24T02:42:18Z ARTEMIS-2210 PagingStore creation is not properly synchronized In PagingManagerImpl#getPageStore() the operations on the map 'stores' are not synchronzed and it's possible that more than one paging store is created for one address. ---
[GitHub] activemq-artemis pull request #2477: ARTEMIS-2190 move tests
GitHub user jbertram opened a pull request: https://github.com/apache/activemq-artemis/pull/2477 ARTEMIS-2190 move tests The "jms-tests" module is deprecated and these tests should have never gone in there. Moving them to the "integration-tests" module. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jbertram/activemq-artemis ARTEMIS-2190 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2477.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2477 commit 96bfc8ef29138d35c3f586a189478cef03c468b2 Author: Justin Bertram Date: 2018-12-21T15:15:45Z ARTEMIS-2190 move tests The "jms-tests" module is deprecated and these tests should have never gone in there. Moving them to the "integration-tests" module. ---
[GitHub] activemq-artemis pull request #2476: Fix deadlock while getting queue messag...
Github user wy96f closed the pull request at: https://github.com/apache/activemq-artemis/pull/2476 ---
[GitHub] activemq-artemis pull request #2476: Fix deadlock while getting queue messag...
GitHub user wy96f opened a pull request: https://github.com/apache/activemq-artemis/pull/2476 Fix deadlock while getting queue message count during cleanup ARTEMIS-2123 introduced a deadlock bug. jstack shows: Thread 1: at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727) at org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl.startPaging(PagingStoreImpl.java:481) at org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl.addSize(PagingStoreImpl.java:739) at org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl.nonDurableUp(PagingStoreImpl.java:952) at org.apache.activemq.artemis.api.core.RefCountMessage.incrementRefCount(RefCountMessage.java:50) at org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl.incrementDelayDeletionCount(LargeServerMessageImpl.java:149) locked org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl@67359741 at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl$LargeMessageDeliverer.(ServerConsumerImpl.java:1171) at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl$LargeMessageDeliverer.(ServerConsumerImpl.java:1151) at org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.handle(ServerConsumerImpl.java:431) locked java.lang.Object@3f31a7b3 at org.apache.activemq.artemis.core.server.impl.QueueImpl.handle(QueueImpl.java:2809) at org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:2196) locked org.apache.activemq.artemis.core.server.impl.QueueImpl@6c2bd0dc at org.apache.activemq.artemis.core.server.impl.QueueImpl.access$1900(QueueImpl.java:105) at org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:3001) locked org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner@79dea1f9 Thread 2: at org.apache.activemq.artemis.core.server.impl.QueueImpl.getScheduledCount(QueueImpl.java:1085) blocked on org.apache.activemq.artemis.core.server.impl.QueueImpl@742b7e17 at org.apache.activemq.artemis.core.server.impl.QueueImpl.getMessageCount(QueueImpl.java:1077) at org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl.deliverIfNecessary(PageCursorProviderImpl.java:610) at org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl.cleanup(PageCursorProviderImpl.java:365) locked org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl@5aa5010 at org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl$1.run(PageCursorProviderImpl.java:288) The cleanup thread held pagingStore lock and requested queue lock. The largeMessageDeliver held queue lock and requested pagingStore lock. Deadlock occurred. Put queue::getMessageCount outside of pagingstore lock to fix the bug. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wy96f/activemq-artemis fix_deadlock_caused_by_getmessagecount Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2476.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2476 commit f3f63fc961ea9573a4b7caaf1fe19cec15fcac3a Author: yang wei Date: 2018-12-21T04:29:04Z Fix deadlock while getting queue message count during cleanup ---
[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...
Github user ehsavoie commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2474#discussion_r243354412 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -163,13 +163,17 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL int fileSize = config.getJournalFileSize(); // we need to correct the file size if its not a multiple of the alignement - int modulus = fileSize % journalFF.getAlignment(); - if (modulus != 0) { - int difference = modulus; - int low = config.getJournalFileSize() - difference; - int high = low + journalFF.getAlignment(); - fileSize = difference < journalFF.getAlignment() / 2 ? low : high; - ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), fileSize, journalFF.getAlignment()); + if (fileSize <= journalFF.getAlignment()) { + fileSize = journalFF.getAlignment(); --- End diff -- Done ---
[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2474#discussion_r243338608 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -163,13 +163,17 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL int fileSize = config.getJournalFileSize(); // we need to correct the file size if its not a multiple of the alignement - int modulus = fileSize % journalFF.getAlignment(); - if (modulus != 0) { - int difference = modulus; - int low = config.getJournalFileSize() - difference; - int high = low + journalFF.getAlignment(); - fileSize = difference < journalFF.getAlignment() / 2 ? low : high; - ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), fileSize, journalFF.getAlignment()); + if (fileSize <= journalFF.getAlignment()) { + fileSize = journalFF.getAlignment(); --- End diff -- I will just mere it.. but if you can add an unit test... ---
[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2474#discussion_r243338301 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -163,13 +163,17 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL int fileSize = config.getJournalFileSize(); // we need to correct the file size if its not a multiple of the alignement - int modulus = fileSize % journalFF.getAlignment(); - if (modulus != 0) { - int difference = modulus; - int low = config.getJournalFileSize() - difference; - int high = low + journalFF.getAlignment(); - fileSize = difference < journalFF.getAlignment() / 2 ? low : high; - ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), fileSize, journalFF.getAlignment()); + if (fileSize <= journalFF.getAlignment()) { + fileSize = journalFF.getAlignment(); --- End diff -- Lets not complicate things... but a unit test would be nice :) ---
[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...
Github user ehsavoie commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2474#discussion_r243331449 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -163,13 +163,17 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL int fileSize = config.getJournalFileSize(); // we need to correct the file size if its not a multiple of the alignement - int modulus = fileSize % journalFF.getAlignment(); - if (modulus != 0) { - int difference = modulus; - int low = config.getJournalFileSize() - difference; - int high = low + journalFF.getAlignment(); - fileSize = difference < journalFF.getAlignment() / 2 ? low : high; - ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), fileSize, journalFF.getAlignment()); + if (fileSize <= journalFF.getAlignment()) { + fileSize = journalFF.getAlignment(); --- End diff -- There is already such an exception when creating the journal (java.lang.IllegalArgumentException: File size cannot be less than 1024 bytes Caused by: java.lang.IllegalArgumentException: File size cannot be less than 1024 bytes") . Would setting to high be acceptable ? ---
[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2474#discussion_r243283678 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -163,13 +163,17 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL int fileSize = config.getJournalFileSize(); // we need to correct the file size if its not a multiple of the alignement - int modulus = fileSize % journalFF.getAlignment(); - if (modulus != 0) { - int difference = modulus; - int low = config.getJournalFileSize() - difference; - int high = low + journalFF.getAlignment(); - fileSize = difference < journalFF.getAlignment() / 2 ? low : high; - ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), fileSize, journalFF.getAlignment()); + if (fileSize <= journalFF.getAlignment()) { + fileSize = journalFF.getAlignment(); --- End diff -- I would actually throw an exception here. I wouldn't expect a block size that low. you can only have the header on the journal and nothing else if you specify fileSize == alignment. ---
[GitHub] activemq-artemis pull request #2475: ARTEMIS-2144 - tx begin failure in ra d...
GitHub user andytaylor opened a pull request: https://github.com/apache/activemq-artemis/pull/2475 ARTEMIS-2144 - tx begin failure in ra doesn't get cleaned up https://issues.apache.org/jira/browse/ARTEMIS-2144 You can merge this pull request into a Git repository by running: $ git pull https://github.com/andytaylor/activemq-artemis ARTEMIS-2144-master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2475.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2475 commit cc316d18c39bf1ac015d6192532a148c90cda4b4 Author: andytaylor Date: 2018-10-24T10:21:52Z ARTEMIS-2144 - tx begin failure in ra doesn't get cleaned up https://issues.apache.org/jira/browse/ARTEMIS-2144 ---
[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...
Github user franz1981 commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2474#discussion_r243190151 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java --- @@ -163,13 +163,17 @@ protected void init(Configuration config, IOCriticalErrorListener criticalErrorL int fileSize = config.getJournalFileSize(); // we need to correct the file size if its not a multiple of the alignement - int modulus = fileSize % journalFF.getAlignment(); - if (modulus != 0) { - int difference = modulus; - int low = config.getJournalFileSize() - difference; - int high = low + journalFF.getAlignment(); - fileSize = difference < journalFF.getAlignment() / 2 ? low : high; - ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), fileSize, journalFF.getAlignment()); + if (fileSize <= journalFF.getAlignment()) { --- End diff -- If `alignment` is a power of 2 (and it should be right?) you can use: ``` public static long align(final long value, final long alignment) { return (value + (alignment - 1)) & ~(alignment - 1); } ``` That is into `BytesUtils::align` ---
[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...
GitHub user ehsavoie opened a pull request: https://github.com/apache/activemq-artemis/pull/2474 [ARTEMIS-1536]: Incorrect Journal filesize calculation where specified size is lest that the block size when using AIO. * If the specified file size is under the fs block size then the resulting file size is 0. Setting it to the block size in this case. Jira: https://issues.apache.org/jira/browse/ARTEMIS-1536 You can merge this pull request into a Git repository by running: $ git pull https://github.com/ehsavoie/apache-activemq-artemis ARTEMIS-1536 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2474.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2474 commit 2e0e718d0c91906fbc0e9e0fa13ad85f7b58ca64 Author: Emmanuel Hugonnet Date: 2018-12-20T08:11:38Z [ARTEMIS-1536]: Incorrect Journal filesize calculation where specified size is lest that the block size when using AIO. * If the specified file size is under the fs block size then the resulting file size is 0. Setting it to the block size in this case. Jira: https://issues.apache.org/jira/browse/ARTEMIS-1536 ---
[GitHub] activemq-artemis pull request #2473: ARTEMIS-1058 Jars in web tmp dir locked...
Github user gaohoward commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2473#discussion_r243139995 --- Diff: artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java --- @@ -237,28 +236,39 @@ public void start() throws Exception { public void internalStop() throws Exception { server.stop(); if (webContexts != null) { + File tmpdir = null; + StringBuilder strBuilder = new StringBuilder(); + boolean found = false; for (WebAppContext context : webContexts) { tmpdir = context.getTempDirectory(); -if (tmpdir != null && !context.isPersistTempDirectory()) { +if (tmpdir != null && tmpdir.exists() && !context.isPersistTempDirectory()) { //tmpdir will be removed by deleteOnExit() - //somehow when broker is stopped and restarted quickly - //this tmpdir won't get deleted sometimes - boolean fileDeleted = TimeUtils.waitOnBoolean(false, 5000, tmpdir::exists); - - if (!fileDeleted) { - //because the execution order of shutdown hooks are - //not determined, so it's possible that the deleteOnExit - //is executed after this hook, in that case we force a delete. - FileUtil.deleteDirectory(tmpdir); - logger.debug("Force to delete temporary file on shutdown: " + tmpdir.getAbsolutePath()); - if (tmpdir.exists()) { - ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir); - } + //However because the URLClassLoader never release/close its opened + //jars the jar file won't be able to get deleted on Windows platform + //until after the process fully terminated. To fix this here arranges + //a separate process to try clean up the temp dir + FileUtil.deleteDirectory(tmpdir); + if (tmpdir.exists()) { + ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir); + strBuilder.append(tmpdir); + strBuilder.append(","); + found = true; } } } + + if (found) { +String artemisHome = System.getProperty("artemis.home"); --- End diff -- This "artemis.home" property is not defined in configured in broker.xml file. It's only defined in artemis.profile and set to system property in artemis.cmd script (-Dartemis.home="$ARTEMIS_HOME") I can't get it from the other configurations. ---
[GitHub] activemq-artemis pull request #2473: ARTEMIS-1058 Jars in web tmp dir locked...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2473#discussion_r243083234 --- Diff: artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java --- @@ -237,28 +236,39 @@ public void start() throws Exception { public void internalStop() throws Exception { server.stop(); if (webContexts != null) { + File tmpdir = null; + StringBuilder strBuilder = new StringBuilder(); + boolean found = false; for (WebAppContext context : webContexts) { tmpdir = context.getTempDirectory(); -if (tmpdir != null && !context.isPersistTempDirectory()) { +if (tmpdir != null && tmpdir.exists() && !context.isPersistTempDirectory()) { //tmpdir will be removed by deleteOnExit() - //somehow when broker is stopped and restarted quickly - //this tmpdir won't get deleted sometimes - boolean fileDeleted = TimeUtils.waitOnBoolean(false, 5000, tmpdir::exists); - - if (!fileDeleted) { - //because the execution order of shutdown hooks are - //not determined, so it's possible that the deleteOnExit - //is executed after this hook, in that case we force a delete. - FileUtil.deleteDirectory(tmpdir); - logger.debug("Force to delete temporary file on shutdown: " + tmpdir.getAbsolutePath()); - if (tmpdir.exists()) { - ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir); - } + //However because the URLClassLoader never release/close its opened + //jars the jar file won't be able to get deleted on Windows platform + //until after the process fully terminated. To fix this here arranges + //a separate process to try clean up the temp dir + FileUtil.deleteDirectory(tmpdir); + if (tmpdir.exists()) { + ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir); + strBuilder.append(tmpdir); + strBuilder.append(","); + found = true; } } } + + if (found) { +String artemisHome = System.getProperty("artemis.home"); --- End diff -- can't you get the property from other means? Fileconfiguration for instance? ---
[GitHub] activemq-artemis pull request #2470: Fixes for alerts from lgtm.com
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2470#discussion_r243076837 --- Diff: artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java --- @@ -216,63 +216,63 @@ private static void printPages(DescribeJournal describeJournal, if (pgStore != null) { folder = pgStore.getFolder(); --- End diff -- The not-null check here on `pgStore` indicates that `pgStore` may, in fact, be null. If it is null then it will trigger a `NullPointerException` almost straight away. ---
[GitHub] activemq-artemis pull request #2470: Fixes for alerts from lgtm.com
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2470#discussion_r243073035 --- Diff: artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java --- @@ -216,63 +216,63 @@ private static void printPages(DescribeJournal describeJournal, if (pgStore != null) { folder = pgStore.getFolder(); --- End diff -- There's a semantic change here, are you sure? ---
[GitHub] activemq-artemis pull request #2464: ARTEMIS-1859 Incorrect routing with AMQ...
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2464#discussion_r243026861 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java --- @@ -216,9 +219,23 @@ public RemotingConnection getRemotingConnection() { flow(); } - public RoutingType getRoutingType(Receiver receiver, SimpleString address) { + public RoutingType getRoutingType(Receiver receiver, SimpleString address, AMQPMessage message) { org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); - return target != null ? getRoutingType(target.getCapabilities(), address) : getRoutingType((Symbol[]) null, address); + // the target may be null or have no capabilities in the case of an anonymous producer + if (target != null && target.getCapabilities() != null) { --- End diff -- I'll rework this. Thanks for the feedback, @gemmellr. ---