[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245972322
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

Would a receiveNoWait (either in or outside the loop) like the other tests 
be nicer than burning 2 seconds? Slow tests is a key reason eventually noone 
wants to runs the tests :)


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r245960681
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

Did you look over this? 


---


[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...

2019-01-08 Thread onlyMIT
GitHub user onlyMIT opened a pull request:

https://github.com/apache/activemq-artemis/pull/2493

ARTEMIS-2223 when a new consumer is created, no subscription is called.

In the 'MQTTTest.testCleanSession()' test method, when a new consumer is 
created, no subscription is called.Consumers need to subscribe before they 
consume the news.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onlyMIT/activemq-artemis ARTEMIS-2223

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2493.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2493


commit 2a6d230c4c8cd739090c5da58754a005d62e1a40
Author: onlyMIT 
Date:   2019-01-08T09:39:37Z

ARTEMIS-2223 when a new consumer is created, no subscription is called.

In the 'MQTTTest.testCleanSession()' test method, when a new consumer is 
created, no subscription is called.Consumers need to subscribe before they 
consume the news.




---


[GitHub] activemq-artemis pull request #2492: ARTEMIS-2222 why the position remains u...

2019-01-08 Thread CNNJYB
GitHub user CNNJYB opened a pull request:

https://github.com/apache/activemq-artemis/pull/2492

ARTEMIS- why the position remains unchanged if ignored is set to true

I am a bit confused about this, When CursorIterator:next is called during 
queue depage, if ignored is set to true, why the position remains unchanged.
  
if (!ignored) {
position = message.getPosition();
}   
For example, the client sends some messages to the topic subscriber ta 
(this topic has two subscribers ta and tb), every time tb depage continuous 
PagePositions that ignored are set to true will be traversed again.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/CNNJYB/activemq-artemis 
dev-CursorIterator-moveNext-ignored

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2492.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2492


commit 49f8bcd99640209c825c22e54a32c872850cc000
Author: andytaylor 
Date:   2018-10-24T10:21:52Z

ARTEMIS-2144 - tx begin failure in ra doesn't get cleaned up

https://issues.apache.org/jira/browse/ARTEMIS-2144

commit 7ae39f7d7218e7c5a93eaecb8f24af3d6186e6f3
Author: yb <17061955@...>
Date:   2019-01-08T09:14:18Z

ARTEMIS- why the position remains unchanged if ignored is set to true




---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-07 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r245879352
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

It is necessary to call the "clean()" method to clean up old session 
information when creating a connection.
If it is not cleaned up, when the cleanSession of the last MQTT consumer is 
false, and the cleanSession of the connected MQTT consumer is true, the message 
in the old queue will be consumed, which is actually not allowed.
I think this is why calling "clean()" in the "setIsClean(boolean isClean)" 
method


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-07 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r245878622
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

It is necessary to call the "clean()" method to clean up old session 
information when creating a connection.
If it is not cleaned up, when the cleanSession of the last MQTT consumer is 
false, and the cleanSession of the connected MQTT consumer is true, the message 
in the old queue will be consumed, which is actually not allowed.  
I think this is why calling "clean()" in the "setIsClean(boolean isClean)" 
method


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-07 Thread jbertram
GitHub user jbertram opened a pull request:

https://github.com/apache/activemq-artemis/pull/2491

ARTEMIS-2217 remove state on clean MQTT session disconnect



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jbertram/activemq-artemis ARTEMIS-2217

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2491.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2491


commit a4d0cf9ab42dd5076b78fb47e3425d11c917db53
Author: Justin Bertram 
Date:   2018-12-29T08:53:04Z

ARTEMIS-2217 remove state on clean MQTT session disconnect




---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-07 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r245777806
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
 ---
@@ -359,59 +371,77 @@ public boolean flowControl(ReadyListener 
readyListener) {
 
@Override
public void onRemoteOpen(Connection connection) throws Exception {
-  lock();
+  handler.requireHandler();
   try {
- try {
-initInternal();
- } catch (Exception e) {
-log.error("Error init connection", e);
- }
- if (!validateConnection(connection)) {
-connection.close();
- } else {
-connection.setContext(AMQPConnectionContext.this);
-connection.setContainer(containerId);
-connection.setProperties(connectionProperties);
-
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-connection.open();
- }
-  } finally {
- unlock();
+ initInternal();
+  } catch (Exception e) {
+ log.error("Error init connection", e);
+  }
+  if (!validateConnection(connection)) {
+ connection.close();
+  } else {
+ connection.setContext(AMQPConnectionContext.this);
+ connection.setContainer(containerId);
+ connection.setProperties(connectionProperties);
+ 
connection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ connection.open();
   }
   initialise();
 
- /*
- * This can be null which is in effect an empty map, also we 
really don't need to check this for in bound connections
- * but its here in case we add support for outbound connections.
- * */
+  /*
+  * This can be null which is in effect an empty map, also we really 
don't need to check this for in bound connections
+  * but its here in case we add support for outbound connections.
+  * */
   if (connection.getRemoteProperties() == null || 
!connection.getRemoteProperties().containsKey(CONNECTION_OPEN_FAILED)) {
  long nextKeepAliveTime = handler.tick(true);
  if (nextKeepAliveTime != 0 && scheduledPool != null) {
-scheduledPool.schedule(new Runnable() {
-   @Override
-   public void run() {
-  Long rescheduleAt = handler.tick(false);
-  if (rescheduleAt == null) {
- // this mean tick could not acquire a lock, we will 
just retry in 10 milliseconds.
- scheduledPool.schedule(this, 10, 
TimeUnit.MILLISECONDS);
-  } else if (rescheduleAt != 0) {
- scheduledPool.schedule(this, rescheduleAt - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
-  }
-   }
-}, (nextKeepAliveTime - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), TimeUnit.MILLISECONDS);
+scheduledPool.schedule(new ScheduleRunnable(), 
(nextKeepAliveTime - TimeUnit.NANOSECONDS.toMillis(System.nanoTime())), 
TimeUnit.MILLISECONDS);
  }
   }
}
 
+   class TickerRunnable implements Runnable {
+
+  final ScheduleRunnable scheduleRunnable;
+
+  TickerRunnable(ScheduleRunnable scheduleRunnable) {
+ this.scheduleRunnable = scheduleRunnable;
+  }
+
+  @Override
+  public void run() {
+ try {
+Long rescheduleAt = handler.tick(false);
+if (rescheduleAt == null) {
+   // this mean tick could not acquire a lock, we will just 
retry in 10 milliseconds.
+   scheduledPool.schedule(scheduleRunnable, 10, 
TimeUnit.MILLISECONDS);
+} else if (rescheduleAt != 0) {
+   scheduledPool.schedule(scheduleRunnable, rescheduleAt - 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), TimeUnit.MILLISECONDS);
+}
+ } catch (Exception e) {
+log.warn(e.getMessage(), e);
+ }
--- End diff --

I'm removing the catch here. I don't think we need to use specific loggers 
on generic handlers like this though. it was a generic handler.. but it's being 
removed.


---


[GitHub] activemq-artemis pull request #2475: ARTEMIS-2144 - tx begin failure in ra d...

2019-01-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2475


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-07 Thread michaelandrepearce
GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/activemq-artemis/pull/2490

V2 196

@franz1981 an alternative so we don't have to have a copy of 
CopyOnWriteArrayList, it does mean on add or remove consumer we have to invoke 
toArray which causes a copy, but this is not on hot path, so i think we should 
be good, and avoids us having to clone a jvm class.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelandrepearce/activemq-artemis V2-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2490


commit d731ffe7288cb857fef1b97deff4b7dc18aeb6d7
Author: Michael André Pearce 
Date:   2018-12-31T13:22:02Z

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs

commit b0c775840fc98b5d3f5f3485802de3270c614d9a
Author: Michael André Pearce 
Date:   2019-01-05T09:48:24Z

Extract




---


[GitHub] activemq-artemis pull request #2489: ARTEMIS-2220 Fix PageCursorStressTest::...

2019-01-04 Thread franz1981
GitHub user franz1981 opened a pull request:

https://github.com/apache/activemq-artemis/pull/2489

ARTEMIS-2220 Fix PageCursorStressTest::testSimpleCursorWithFilter NPE

FakeQueue is not correctly setting the queue on its PageSubscription,
leading to fail the test due to NPEs when PageSubscription::getQueue
is being used.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/franz1981/activemq-artemis ARTEMIS-2220

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2489.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2489


commit 32cb5271bb4f61c23c27ed3b7a3cda512e2648fc
Author: Francesco Nigro 
Date:   2019-01-04T22:50:56Z

ARTEMIS-2220 Fix PageCursorStressTest::testSimpleCursorWithFilter NPE

FakeQueue is not correctly setting the queue on its PageSubscription,
leading to fail the test due to NPEs when PageSubscription::getQueue
is being used.




---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-04 Thread wy96f
Github user wy96f commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245252912
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

> Yes, but we can just volatile load once before checking its value 3 
times, on each call of isPagingDirtyRead

get it. nice catch :+1: 


---


[GitHub] activemq-artemis pull request #2481: ARTEMIS-2213 don't expire critical comp...

2019-01-04 Thread wy96f
Github user wy96f closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2481


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245239811
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

@wy96f what @franz1981 is trying to say, is we can do the volatile read 
just once, by adding one line e.g.


 AddressFullMessagePolicy addressFullMessagePolicy = 
this.addressFullMessagePolicy;
 if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
return false;
 }
 if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
return isFull();
 }
 if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
return isFull();
 }
 return paging;


---


[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable.

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2482#discussion_r245233815
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 ---
@@ -120,14 +126,16 @@ public PagedReferenceImpl(final PagePosition position,
  this.largeMessage = message.getMessage().isLargeMessage() ? 
IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
  this.transactionID = message.getTransactionID();
  this.messageID = message.getMessage().getMessageID();
-
+ this.durable = message.getMessage().isDurable() ? IS_DURABLE : 
IS_NOT_DURABLE;
+ this.deliveryTime = 
message.getMessage().getScheduledDeliveryTime();
  //pre-cache the message size so we don't have to reload the 
message later if it is GC'd
  getPersistentSize();
   } else {
  this.largeMessage = UNDEFINED_IS_LARGE_MESSAGE;
  this.transactionID = -2;
  this.messageID = -1;
  this.messageSize = -1;
+ this.durable = UNDEFINED_IS_DURABLE;
--- End diff --

for completeness (its a nit) set deliveryTime to its undefined value here.


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245224854
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

Yes, but we can just volatile load once before checking its value 3 times, 
on each call of  isPagingDirtyRead


---


[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...

2019-01-03 Thread wy96f
Github user wy96f commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2482#discussion_r245195927
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 ---
@@ -120,6 +128,8 @@ public PagedReferenceImpl(final PagePosition position,
  this.largeMessage = message.getMessage().isLargeMessage() ? 
IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
  this.transactionID = message.getTransactionID();
  this.messageID = message.getMessage().getMessageID();
+ this.priority = message.getMessage().getPriority();
--- End diff --

deliveryTime can be set in the constructor like transactionID , messageID , 
etc :)


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-03 Thread wy96f
Github user wy96f commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245183888
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

addressFullMessagePolicy would be changed if address setting is reapplied. 
so we need to load the value.


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245095523
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -1350,7 +1350,7 @@ public synchronized boolean hasNext() {
 return true;
  }
 
- if (!pageStore.isPaging()) {
+ if (!pageStore.isPagingDirtyRead()) {
--- End diff --

Concern here is this ins't an async case.

Btw i cannot see the change in QueueImpl to use the new paging dirty read.


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245094809
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

nice idea!


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r245090041
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -309,16 +309,17 @@ public void run() {
 */
@Override
protected void performCachedLargeMessageDeletes() {
-  for (Long largeMsgId : largeMessagesToDelete) {
- SequentialFile msg = createFileForLargeMessage(largeMsgId, 
LargeMessageExtension.DURABLE);
+  for (LargeServerMessage largeServerMessage : 
largeMessagesToDelete.values()) {
--- End diff --

Usage of LongConcurrentHashMap  looks much better.


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245048241
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

we can read it just once and save it in a local variable, avoiding 3 
volatile loads: same can be done on the original version too


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245030347
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029912
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -25,20 +26,24 @@
 
private SimpleString filterString;
 
+   private int priority;
--- End diff --

marking resolved.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029854
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Iterator;
+
+public interface ResetableIterator extends Iterator {
+
+   /**
+* Resets the iterator so you can re-iterate over all elements.
+*
+* @return itself, this is just for convenience.
+*/
+   ResetableIterator reset();
--- End diff --

ill mark resolved then.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029745
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2494,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
-
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

resolving as discussed else where


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029528
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245014300
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Iterator;
+
+public interface ResetableIterator extends Iterator {
+
+   /**
+* Resets the iterator so you can re-iterate over all elements.
+*
+* @return itself, this is just for convenience.
+*/
+   ResetableIterator reset();
--- End diff --

Got it, thanks!


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245013090
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245011975
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Iterator;
+
+public interface ResetableIterator extends Iterator {
+
+   /**
+* Resets the iterator so you can re-iterate over all elements.
+*
+* @return itself, this is just for convenience.
+*/
+   ResetableIterator reset();
--- End diff --

-1 we are not closing the iterator, nor would this go in a try resources 
block., we are simply resetting the iterator so it marks the endpos = startpos, 
so we continue to round robin, as successfully handled a message.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245012214
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245010184
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  int low = 

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009418
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) {
   buffer.writeNullableSimpleString(filterString);
   buffer.writeBoolean(browseOnly);
   buffer.writeBoolean(requiresResponse);
+  buffer.writeInt(priority);
--- End diff --

we plan to support int.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009864
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Users in AMQP land will expect Integer.

Qpid 
-https://qpid.apache.org/releases/qpid-broker-j-7.0.6/book/Java-Broker-Runtime-Consumers.html#Java-Broker-Runtime-Consumers-Prioirty


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245008661
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -3080,45 +3053,20 @@ private boolean deliverDirect(final 
MessageReference ref) {
 return true;
  }
 
- int startPos = pos;
-
- int size = consumerList.size();
+ consumers.reset();
 
- while (true) {
-ConsumerHolder holder;
-if (redistributor == null) {
-   holder = consumerList.get(pos);
-} else {
-   holder = redistributor;
-}
+ while (consumers.hasNext() || redistributor != null) {
--- End diff --

Trying to keep to standard interfaces, this is the standard Iterator 
methods. Also at this point in the while we do not want to actually get the 
next, theres some timeouts and other checks needed to be done first, thus next 
is called a little later on.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009338
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -25,20 +26,24 @@
 
private SimpleString filterString;
 
+   private int priority;
--- End diff --

We should support int, as AMQP uses can use -2^31 to 2^31-1. 

This changes nothing on the Message size, or any space concerns, this is 
Only on consumer creation.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245008000
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2494,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
-
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

That is not this intent, the intent in reset is just to move the iterator 
markers, its not a resources its purpose. 


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245007330
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245007052
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006670
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006519
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface QueueConsumers extends 
Collection {
+
+   Set getPriorites();
--- End diff --

Priority is of Integer type, not byte.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006326
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
 ---
@@ -21,7 +21,7 @@
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
-public interface Consumer {
+public interface Consumer extends PriorityAware {
--- End diff --

Correct, but to make testing of the QueueConsumersImpl logic easier and 
quite self contained this, its kept seperate, also it means if we wanted to 
reuse the new QueueConsumerImpl for other PriorityAware needs it makes it 
easier.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244987340
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

I would use a cast to `Number` and call `Number::byteValue`


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244986539
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) {
   buffer.writeNullableSimpleString(filterString);
   buffer.writeBoolean(browseOnly);
   buffer.writeBoolean(requiresResponse);
+  buffer.writeInt(priority);
--- End diff --

I will write it as a byte, if we don't plan to support more then 127 
priorities, consumer-side: but as I've said is a negligible save of space on 
the wire


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244986350
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -25,20 +26,24 @@
 
private SimpleString filterString;
 
+   private int priority;
--- End diff --

I will change it as `byte` if we are not planning to support priorities > 
127 ie `Byte::MAX_VALUE`
For small messages it *could* save some space (TBH to be verified with JOL 
tool)


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244985399
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Iterator;
+
+public interface ResetableIterator extends Iterator {
+
+   /**
+* Resets the iterator so you can re-iterate over all elements.
+*
+* @return itself, this is just for convenience.
+*/
+   ResetableIterator reset();
--- End diff --

We can extends `AutoCloseable` too and override `close`  in order to call 
`reset` by default too


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244984864
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -3080,45 +3053,20 @@ private boolean deliverDirect(final 
MessageReference ref) {
 return true;
  }
 
- int startPos = pos;
-
- int size = consumerList.size();
+ consumers.reset();
 
- while (true) {
-ConsumerHolder holder;
-if (redistributor == null) {
-   holder = consumerList.get(pos);
-} else {
-   holder = redistributor;
-}
+ while (consumers.hasNext() || redistributor != null) {
--- End diff --

Just thinking loud: given that `consumers::hasNext` is mostly used with a 
`next` after it, why not provide just a `pollNext` method that return the 
consumer or `null` if there isn't any? 


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244984274
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2494,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
-
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

will try to put the `consumers.reset` into a `try..finally` block or 
wrapping the reset call into an `AutoCloseable::close` method to force a 
`try-with-resources` usage too


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244982460
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  int low = 

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244980096
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  int low = 

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244979227
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  int low = 

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244979032
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMissing) {
+  Level[] current = getArray();
+  int low = 

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244978399
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java
 ---
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface QueueConsumers extends 
Collection {
+
+   Set getPriorites();
--- End diff --

I know that's just a test method, so feel free to ignore me, but I would 
just use a byte[] here :P


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244977454
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -727,7 +725,7 @@ private void checkAndCreateDir(final File dir, final 
boolean create) {
   List idList = new ArrayList<>();
   for (String filename : filenames) {
  Long id = getLargeMessageIdFromFilename(filename);
--- End diff --

you can just use a primitive `long` here


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r244976790
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
 ---
@@ -21,7 +21,7 @@
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
-public interface Consumer {
+public interface Consumer extends PriorityAware {
--- End diff --

Just a design q: why using a specific `PriorityAware` interface?
I'm assuming that we can't have anymore a `Consumer` that doesn't provide a 
`default int getPriority()` 


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-03 Thread CNNJYB
Github user CNNJYB commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244976446
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -309,16 +309,17 @@ public void run() {
 */
@Override
protected void performCachedLargeMessageDeletes() {
-  for (Long largeMsgId : largeMessagesToDelete) {
- SequentialFile msg = createFileForLargeMessage(largeMsgId, 
LargeMessageExtension.DURABLE);
+  for (LargeServerMessage largeServerMessage : 
largeMessagesToDelete.values()) {
--- End diff --

@michaelandrepearce update, please review, Thanks.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/activemq-artemis/pull/2488

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelandrepearce/activemq-artemis 
ARTEMIS-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2488


commit 61a91701f3d424d31a83d9942f7786c90ac81559
Author: Michael André Pearce 
Date:   2018-12-31T13:22:02Z

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs




---


[GitHub] activemq-artemis pull request #2487: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2487


---


[GitHub] activemq-artemis pull request #2487: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/activemq-artemis/pull/2487

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in queueimpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelandrepearce/activemq-artemis 
ARTEMIS-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2487


commit 3e4bc71796b202d94ca790f448d9ab15ec073208
Author: Michael André Pearce 
Date:   2018-12-31T13:22:02Z

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in queueimpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs




---


[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2486


---


[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2486


---


[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
GitHub user michaelandrepearce reopened a pull request:

https://github.com/apache/activemq-artemis/pull/2486

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in queueimpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test
Add Core JMS Test
Add AMQP Test
Add Docs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelandrepearce/activemq-artemis 
ARTEMIS-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2486


commit 3e4bc71796b202d94ca790f448d9ab15ec073208
Author: Michael André Pearce 
Date:   2018-12-31T13:22:02Z

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in queueimpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs




---


[GitHub] activemq-artemis pull request #2486: ARTEMIS-196 Implement Consumer Priority

2019-01-02 Thread michaelandrepearce
GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/activemq-artemis/pull/2486

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in queueimpl to its own logical 
class, to be able to implement.
Add OpenWire Test
Add Core Test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelandrepearce/activemq-artemis 
ARTEMIS-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2486.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2486


commit 0ed69c452fee817b735351db5dc450af246a27fa
Author: Michael André Pearce 
Date:   2018-12-31T13:22:02Z

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in queueimpl to its own logical 
class, to be able to implement.
Add OpenWire Test
Add Core Test




---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-02 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244757759
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -309,16 +309,17 @@ public void run() {
 */
@Override
protected void performCachedLargeMessageDeletes() {
-  for (Long largeMsgId : largeMessagesToDelete) {
- SequentialFile msg = createFileForLargeMessage(largeMsgId, 
LargeMessageExtension.DURABLE);
+  for (LargeServerMessage largeServerMessage : 
largeMessagesToDelete.values()) {
--- End diff --

If you wish the collection itself to be iterable, then please add this 
functionality to LongConcurrentHashMap implementation, it shouldn;t be too 
hard, as already it has a forEach method


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-02 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244756852
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -309,16 +309,17 @@ public void run() {
 */
@Override
protected void performCachedLargeMessageDeletes() {
-  for (Long largeMsgId : largeMessagesToDelete) {
- SequentialFile msg = createFileForLargeMessage(largeMsgId, 
LargeMessageExtension.DURABLE);
+  for (LargeServerMessage largeServerMessage : 
largeMessagesToDelete.values()) {
--- End diff --

Calling values actually creates a new List, if you're iterating the 
objects, simply call using forEach method on the collection.


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-02 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244686021
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 ---
@@ -193,7 +193,7 @@ public static JournalContent getType(byte type) {
 
protected final Map 
mapPersistedAddressSettings = new ConcurrentHashMap<>();
 
-   protected final Set largeMessagesToDelete = new HashSet<>();
+   protected final Map largeMessagesToDelete = 
new HashMap<>();
--- End diff --

@CNNJYB we have our own 
org.apache.activemq.artemis.utils.collections.LongConcurrentHashMap, this 
allows it to be concurrent safe, and also means it can be a primitive long.


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-02 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244685607
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -461,8 +462,7 @@ void deleteLargeMessageFile(final LargeServerMessage 
largeServerMessage) throws
  try {
 if (isReplicated() && replicator.isSynchronizing()) {
synchronized (largeMessagesToDelete) {
--- End diff --

@franz1981 looking at the code, looks like its just vanilla HM 

   protected final Map largeMessagesToDelete = 
new HashMap<>();



---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-01 Thread CNNJYB
Github user CNNJYB commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244657469
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 ---
@@ -193,7 +193,7 @@ public static JournalContent getType(byte type) {
 
protected final Map 
mapPersistedAddressSettings = new ConcurrentHashMap<>();
 
-   protected final Set largeMessagesToDelete = new HashSet<>();
+   protected final Map largeMessagesToDelete = 
new ConcurrentHashMap<>();
--- End diff --

@franz1981 modified, please review, Thanks.


---


[GitHub] activemq-artemis pull request #2485: ARTEMIS-2217 ‘MQTTSessionState’ in ...

2018-12-29 Thread onlyMIT
GitHub user onlyMIT opened a pull request:

https://github.com/apache/activemq-artemis/pull/2485

ARTEMIS-2217 ‘MQTTSessionState’ in the ‘SESSIONS ConcurrentHashMap’ 
n…

…ever be removed

‘MQTTSessionState’ in the ‘SESSIONS ConcurrentHashMap’ should be 
removed when the conusmer (cleanSession is true) connection is closed

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onlyMIT/activemq-artemis ARTEMIS-2217

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2485


commit c4c951c3a2cf74f211d9b7c17cad48f27b725ff5
Author: onlyMIT 
Date:   2018-12-29T08:53:04Z

ARTEMIS-2217 ‘MQTTSessionState’ in the ‘SESSIONS ConcurrentHashMap’ 
never be removed

‘MQTTSessionState’ in the ‘SESSIONS ConcurrentHashMap’ should be 
removed when the conusmer (cleanSession is true) connection is closed




---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2018-12-29 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244470771
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -461,8 +462,7 @@ void deleteLargeMessageFile(final LargeServerMessage 
largeServerMessage) throws
  try {
 if (isReplicated() && replicator.isSynchronizing()) {
synchronized (largeMessagesToDelete) {
--- End diff --

If it's now using CHM there is no need to sync on it


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2018-12-29 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r244470859
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
 ---
@@ -193,7 +193,7 @@ public static JournalContent getType(byte type) {
 
protected final Map 
mapPersistedAddressSettings = new ConcurrentHashMap<>();
 
-   protected final Set largeMessagesToDelete = new HashSet<>();
+   protected final Map largeMessagesToDelete = 
new ConcurrentHashMap<>();
--- End diff --

It is possible to use a primitive version of the map ie using primitive 
longs instead of boxed types


---


[GitHub] activemq-artemis pull request #2484: Use a specific executor for pageSyncTim...

2018-12-29 Thread qihongxu
GitHub user qihongxu opened a pull request:

https://github.com/apache/activemq-artemis/pull/2484

Use a specific executor for pageSyncTimer

Improve paging throughput by using a specific executor for pageSyncTimer

Improving throughput on paging mode is one of our concerns since our 
cluster uses paging a lot.
We found that pageSyncTimer in PagingStoreImpl shared the same executor 
with pageCursorProvider from thread pool. In heavy load scenario like hundreds 
of consumers receiving messages simultaneously, it became difficult for 
pageSyncTimer to get the executor due to race condition. Therefore page sync 
was delayed and producers suffered low throughput.

To achieve higher performance we assign a specific executor to 
pageSyncTimer to avoid racing. And we run a small-scale test on a single 
modified broker.

Broker: 4C/8G/500G SSD
Producer: 200 threads, non-transactional send
Consumer 200 threads, transactional receive
Message text size: 100-200 bytes randomly
AddressFullPolicy: PAGE

Test result:

  | Only Send TPS | Only Receive TPS | Send TPS
-- | -- | -- | --
Original ver | 38k | 33k | 3k/30k
Modified ver | 38k | 34k | 30k/12.5k


The chart above shows that on modified broker send TPS improves from 
“poor” to “extremely fast”, while receive TPS drops from “extremely 
fast” to “not-bad” under heavy load. Considering consumer systems usually 
have a long processing chain after receiving messages, we don’t need too fast 
receive TPS. Instead, we want to guarantee send TPS to cope with traffic peak 
and lower producer’s delay time. Moreover, send and receive TPS in total 
raises from 33k to about 43k. From all above this trade-off seems beneficial 
and acceptable.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/qihongxu/activemq-artemis 
pageSyncTimer_executor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2484


commit 01a09f2f2bee98643df06a4eb93588047fea6527
Author: Qihong Xu 
Date:   2018-12-28T11:59:41Z

Use a specific executor for pageSyncTimer




---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2018-12-29 Thread CNNJYB
GitHub user CNNJYB opened a pull request:

https://github.com/apache/activemq-artemis/pull/2483

ARTEMIS-2215 largemessage have been consumed but not deleted

During the backup and live synchronization, the client consumes the 
largemessage, then the live crash(the performCachedLargeMessageDeletes method 
is not executed), after the live startup, the largemessages that have been 
consumed are not deleted from the disk.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/CNNJYB/activemq-artemis 
dev-largemessagenotdelete

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2483.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2483


commit bf28c751af152956d1a12aa2237502c0a14fc4e8
Author: yb <17061955@...>
Date:   2018-12-29T08:09:48Z

ARTEMIS-2215 largemessage have been consumed but not deleted from the disk 
during backup and live sync




---


[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...

2018-12-27 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2482#discussion_r244150563
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 ---
@@ -74,6 +74,10 @@
 
private long messageSize = -1;
 
+   private byte priority;
+
+   private boolean durable;
--- End diff --

need to default this not set somehow, possible use a byte to represent the 
boolean with 0 = false, 1 = true, -1 not set


---


[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...

2018-12-27 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2482#discussion_r244150155
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 ---
@@ -363,4 +371,20 @@ public long getPersistentSize() {
   return messageSize;
}
 
+   @Override
+   public byte getPriority() {
+  if (priority == -1) {
+ priority = getMessage().getPriority();
+  }
+  return priority;
+   }
+
+   @Override
+   public boolean isDurable() {
+  if (messageID < 0) {
--- End diff --

should use durable field or additional flag to see if its been cached, 
messageID maybe cached already but not durable.


---


[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...

2018-12-27 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2482#discussion_r244149886
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 ---
@@ -74,6 +74,10 @@
 
private long messageSize = -1;
 
+   private byte priority;
--- End diff --

default this to -1


---


[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable in ...

2018-12-25 Thread qihongxu
GitHub user qihongxu opened a pull request:

https://github.com/apache/activemq-artemis/pull/2482

ARTEMIS-2214 Cache durable in PagedReference

We recently performed a test on artemis broker and found a severe 
performance issue.

When paged messages are being consumed, decrementMetrics in 
QueuePendingMessageMetrics will try to ‘getMessage’ to check whether they 
are durable or not. In this way queue will be locked for a long time because 
page may be GCed and need to be reload entirely. Other operations rely on queue 
will be blocked at this time, which cause a significant TPS drop. Detailed 
stacks are attached below.

This also happens when consumer is closed and messages are pushed back to 
the queue, artemis will check priority on return if these messages are paged.

To solve the issue, durable and priority need to be cached in 
PagedReference just like messageID, transactionID and so on. I have applied a 
patch to fix the issue. Any review is appreciated.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/qihongxu/activemq-artemis 
modify_pagedReference

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2482


commit a49ad880c2372afdb88bd805fb6e20fdae1de784
Author: Qihong Xu 
Date:   2018-12-26T03:11:10Z

ARTEMIS-2214 Cache durable in PagedReference




---


[GitHub] activemq-artemis pull request #2481: ARTEMIS-2213 don't expire critical comp...

2018-12-24 Thread wy96f
GitHub user wy96f opened a pull request:

https://github.com/apache/activemq-artemis/pull/2481

ARTEMIS-2213 don't expire critical component in the case of clock back drift

In our production cluster some brokers crashed. There is nothing unusual in 
the dump stack. After digging into code, we found component was incorrectly 
expired. When clock drifted back, left time was less than enter time. If the 
component was not entered in default 12ms, it would be expired and server 
was halted.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wy96f/activemq-artemis 
incorrectly_expire_criticalcomponent

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2481


commit ca7cce59592a856dcf8438ff1fec7f7ae18d9e09
Author: yang wei 
Date:   2018-12-25T07:18:52Z

ARTEMIS-2213 don't expire critical component in the case of clock back drift




---


[GitHub] activemq-artemis pull request #2478: ARTEMIS-2210 PagingStore creation is no...

2018-12-24 Thread gaohoward
Github user gaohoward commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2478#discussion_r243866712
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 ---
@@ -335,19 +335,25 @@ public void deletePageStore(final SimpleString 
storeName) throws Exception {
}
 
/**
-* stores is a ConcurrentHashMap, so we don't need to synchronize this 
method
+* This method creates a new store if not exist.
 */
@Override
public PagingStore getPageStore(final SimpleString storeName) throws 
Exception {
   if (managementAddress != null && 
storeName.startsWith(managementAddress)) {
  return null;
   }
-  PagingStore store = stores.get(storeName);
 
-  if (store != null) {
- return store;
+  try {
--- End diff --

ok got it. I've updated the branch. Please take a look again. thx.


---


[GitHub] activemq-artemis pull request #2480: ARTEMIS-2212 Avoid using CLQ on ServerC...

2018-12-24 Thread franz1981
GitHub user franz1981 opened a pull request:

https://github.com/apache/activemq-artemis/pull/2480

ARTEMIS-2212 Avoid using CLQ on ServerConsumerImpl

It would deliver a better performance for the most
common operations eg offer, poll, iterations, size.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/franz1981/activemq-artemis array_q_vs_clq

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2480


commit b5cbf969225f8d72592d7da65e60b999e5ac6882
Author: Francesco Nigro 
Date:   2018-12-12T16:47:33Z

ARTEMIS-2212 Avoid using CLQ on ServerConsumerImpl

It would deliver a better performance for the most
common operations eg offer, poll, iterations, size.




---


[GitHub] activemq-artemis pull request #2478: ARTEMIS-2210 PagingStore creation is no...

2018-12-24 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2478#discussion_r243843391
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 ---
@@ -335,19 +335,25 @@ public void deletePageStore(final SimpleString 
storeName) throws Exception {
}
 
/**
-* stores is a ConcurrentHashMap, so we don't need to synchronize this 
method
+* This method creates a new store if not exist.
 */
@Override
public PagingStore getPageStore(final SimpleString storeName) throws 
Exception {
   if (managementAddress != null && 
storeName.startsWith(managementAddress)) {
  return null;
   }
-  PagingStore store = stores.get(storeName);
 
-  if (store != null) {
- return store;
+  try {
--- End diff --

I think that `stores.get` is better that will be used in the fast path as 
it was before: `chm:.get` is completly lock-free, while `chm::computeIfAbsent` 
will lock on segment level ie better to fallback to `chm::computeIfAbsent` only 
if `chm::get` return `null`


---


[GitHub] activemq-artemis pull request #2479: ARTEMIS-2211 Avoid duplicate code for B...

2018-12-24 Thread franz1981
GitHub user franz1981 opened a pull request:

https://github.com/apache/activemq-artemis/pull/2479

ARTEMIS-2211 Avoid duplicate code for ByteBuffer pooling and alignment

Refactored thread local ByteBuffer pooling for both NIO & MAPPED
seq file factories and used fast branchless alignment operation
for ASYNCIO seq file factory, reusing an util class that was used
just for the MAPPED journal.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/franz1981/activemq-artemis fast_pow_2mod

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2479.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2479


commit 541172fd0f45540193a95e708e914abd3adc9cb9
Author: Francesco Nigro 
Date:   2018-12-20T10:11:36Z

ARTEMIS-2211 Avoid duplicate code for ByteBuffer pooling and alignment

Refactored thread local ByteBuffer pooling for both NIO & MAPPED
seq file factories and used fast branchless alignment operation
for ASYNCIO seq file factory, reusing an util class that was used
just for the MAPPED journal.




---


[GitHub] activemq-artemis pull request #:

2018-12-23 Thread franz1981
Github user franz1981 commented on the pull request:


https://github.com/apache/activemq-artemis/commit/dfa70680fed37d25aa3a6d0d6a0795e580495b6a#commitcomment-31769915
  
No need to synchronize it: `stores` is a concurrent hashmap and you can use 
the atomic  `computeIfAbsent` that use a lambda to populate the map


---


[GitHub] activemq-artemis pull request #2478: ARTEMIS-2210 PagingStore creation is no...

2018-12-23 Thread gaohoward
GitHub user gaohoward opened a pull request:

https://github.com/apache/activemq-artemis/pull/2478

ARTEMIS-2210 PagingStore creation is not properly synchronized

In PagingManagerImpl#getPageStore() the operations on the map 'stores'
are not synchronzed and it's possible that more than one paging store is
created for one address.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gaohoward/activemq-artemis a_2210

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2478.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2478


commit dfa70680fed37d25aa3a6d0d6a0795e580495b6a
Author: Howard Gao 
Date:   2018-12-24T02:42:18Z

ARTEMIS-2210 PagingStore creation is not properly synchronized

In PagingManagerImpl#getPageStore() the operations on the map 'stores'
are not synchronzed and it's possible that more than one paging store is
created for one address.




---


[GitHub] activemq-artemis pull request #2477: ARTEMIS-2190 move tests

2018-12-21 Thread jbertram
GitHub user jbertram opened a pull request:

https://github.com/apache/activemq-artemis/pull/2477

ARTEMIS-2190 move tests

The "jms-tests" module is deprecated and these tests should have never
gone in there. Moving them to the "integration-tests" module.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jbertram/activemq-artemis ARTEMIS-2190

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2477.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2477


commit 96bfc8ef29138d35c3f586a189478cef03c468b2
Author: Justin Bertram 
Date:   2018-12-21T15:15:45Z

ARTEMIS-2190 move tests

The "jms-tests" module is deprecated and these tests should have never
gone in there. Moving them to the "integration-tests" module.




---


[GitHub] activemq-artemis pull request #2476: Fix deadlock while getting queue messag...

2018-12-20 Thread wy96f
Github user wy96f closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2476


---


[GitHub] activemq-artemis pull request #2476: Fix deadlock while getting queue messag...

2018-12-20 Thread wy96f
GitHub user wy96f opened a pull request:

https://github.com/apache/activemq-artemis/pull/2476

Fix deadlock while getting queue message count during cleanup

ARTEMIS-2123 introduced a deadlock bug.

jstack shows:

Thread 1:

at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
at 
org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl.startPaging(PagingStoreImpl.java:481)
at 
org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl.addSize(PagingStoreImpl.java:739)
at 
org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl.nonDurableUp(PagingStoreImpl.java:952)
at 
org.apache.activemq.artemis.api.core.RefCountMessage.incrementRefCount(RefCountMessage.java:50)
at 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl.incrementDelayDeletionCount(LargeServerMessageImpl.java:149)

locked 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl@67359741
at 
org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl$LargeMessageDeliverer.(ServerConsumerImpl.java:1171)
at 
org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl$LargeMessageDeliverer.(ServerConsumerImpl.java:1151)
at 
org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl.handle(ServerConsumerImpl.java:431)
locked java.lang.Object@3f31a7b3
at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.handle(QueueImpl.java:2809)
at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.deliver(QueueImpl.java:2196)
locked org.apache.activemq.artemis.core.server.impl.QueueImpl@6c2bd0dc
at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.access$1900(QueueImpl.java:105)
at 
org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner.run(QueueImpl.java:3001)
locked 
org.apache.activemq.artemis.core.server.impl.QueueImpl$DeliverRunner@79dea1f9
Thread 2:

at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.getScheduledCount(QueueImpl.java:1085)

blocked on org.apache.activemq.artemis.core.server.impl.QueueImpl@742b7e17
at 
org.apache.activemq.artemis.core.server.impl.QueueImpl.getMessageCount(QueueImpl.java:1077)
at 
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl.deliverIfNecessary(PageCursorProviderImpl.java:610)
at 
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl.cleanup(PageCursorProviderImpl.java:365)
locked 
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl@5aa5010
at 
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl$1.run(PageCursorProviderImpl.java:288)
 

The cleanup thread held pagingStore lock and requested queue lock. The 
largeMessageDeliver held queue lock and requested pagingStore lock. Deadlock 
occurred.

Put queue::getMessageCount outside of pagingstore lock to fix the bug.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wy96f/activemq-artemis 
fix_deadlock_caused_by_getmessagecount

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2476.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2476


commit f3f63fc961ea9573a4b7caaf1fe19cec15fcac3a
Author: yang wei 
Date:   2018-12-21T04:29:04Z

Fix deadlock while getting queue message count during cleanup




---


[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...

2018-12-20 Thread ehsavoie
Github user ehsavoie commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2474#discussion_r243354412
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -163,13 +163,17 @@ protected void init(Configuration config, 
IOCriticalErrorListener criticalErrorL
 
   int fileSize = config.getJournalFileSize();
   // we need to correct the file size if its not a multiple of the 
alignement
-  int modulus = fileSize % journalFF.getAlignment();
-  if (modulus != 0) {
- int difference = modulus;
- int low = config.getJournalFileSize() - difference;
- int high = low + journalFF.getAlignment();
- fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
- 
ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), 
fileSize, journalFF.getAlignment());
+  if (fileSize <= journalFF.getAlignment()) {
+ fileSize = journalFF.getAlignment();
--- End diff --

Done


---


[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...

2018-12-20 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2474#discussion_r243338608
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -163,13 +163,17 @@ protected void init(Configuration config, 
IOCriticalErrorListener criticalErrorL
 
   int fileSize = config.getJournalFileSize();
   // we need to correct the file size if its not a multiple of the 
alignement
-  int modulus = fileSize % journalFF.getAlignment();
-  if (modulus != 0) {
- int difference = modulus;
- int low = config.getJournalFileSize() - difference;
- int high = low + journalFF.getAlignment();
- fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
- 
ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), 
fileSize, journalFF.getAlignment());
+  if (fileSize <= journalFF.getAlignment()) {
+ fileSize = journalFF.getAlignment();
--- End diff --

I will just mere it.. but if you can add an unit test...



---


[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...

2018-12-20 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2474#discussion_r243338301
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -163,13 +163,17 @@ protected void init(Configuration config, 
IOCriticalErrorListener criticalErrorL
 
   int fileSize = config.getJournalFileSize();
   // we need to correct the file size if its not a multiple of the 
alignement
-  int modulus = fileSize % journalFF.getAlignment();
-  if (modulus != 0) {
- int difference = modulus;
- int low = config.getJournalFileSize() - difference;
- int high = low + journalFF.getAlignment();
- fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
- 
ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), 
fileSize, journalFF.getAlignment());
+  if (fileSize <= journalFF.getAlignment()) {
+ fileSize = journalFF.getAlignment();
--- End diff --

Lets not complicate things... but a unit test would be nice :)


---


[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...

2018-12-20 Thread ehsavoie
Github user ehsavoie commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2474#discussion_r243331449
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -163,13 +163,17 @@ protected void init(Configuration config, 
IOCriticalErrorListener criticalErrorL
 
   int fileSize = config.getJournalFileSize();
   // we need to correct the file size if its not a multiple of the 
alignement
-  int modulus = fileSize % journalFF.getAlignment();
-  if (modulus != 0) {
- int difference = modulus;
- int low = config.getJournalFileSize() - difference;
- int high = low + journalFF.getAlignment();
- fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
- 
ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), 
fileSize, journalFF.getAlignment());
+  if (fileSize <= journalFF.getAlignment()) {
+ fileSize = journalFF.getAlignment();
--- End diff --

There is already such an exception when creating the journal 
(java.lang.IllegalArgumentException: File size cannot be less than 1024 bytes
Caused by: java.lang.IllegalArgumentException: File size cannot be less 
than 1024 bytes") . Would setting to high be acceptable ?


---


[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...

2018-12-20 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2474#discussion_r243283678
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -163,13 +163,17 @@ protected void init(Configuration config, 
IOCriticalErrorListener criticalErrorL
 
   int fileSize = config.getJournalFileSize();
   // we need to correct the file size if its not a multiple of the 
alignement
-  int modulus = fileSize % journalFF.getAlignment();
-  if (modulus != 0) {
- int difference = modulus;
- int low = config.getJournalFileSize() - difference;
- int high = low + journalFF.getAlignment();
- fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
- 
ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), 
fileSize, journalFF.getAlignment());
+  if (fileSize <= journalFF.getAlignment()) {
+ fileSize = journalFF.getAlignment();
--- End diff --

I would actually throw an exception here. I wouldn't expect a block size 
that low. you can only have the header on the journal and nothing else if you 
specify fileSize == alignment.


---


[GitHub] activemq-artemis pull request #2475: ARTEMIS-2144 - tx begin failure in ra d...

2018-12-20 Thread andytaylor
GitHub user andytaylor opened a pull request:

https://github.com/apache/activemq-artemis/pull/2475

ARTEMIS-2144 - tx begin failure in ra doesn't get cleaned up

https://issues.apache.org/jira/browse/ARTEMIS-2144

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/andytaylor/activemq-artemis 
ARTEMIS-2144-master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2475.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2475


commit cc316d18c39bf1ac015d6192532a148c90cda4b4
Author: andytaylor 
Date:   2018-10-24T10:21:52Z

ARTEMIS-2144 - tx begin failure in ra doesn't get cleaned up

https://issues.apache.org/jira/browse/ARTEMIS-2144




---


[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...

2018-12-20 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2474#discussion_r243190151
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -163,13 +163,17 @@ protected void init(Configuration config, 
IOCriticalErrorListener criticalErrorL
 
   int fileSize = config.getJournalFileSize();
   // we need to correct the file size if its not a multiple of the 
alignement
-  int modulus = fileSize % journalFF.getAlignment();
-  if (modulus != 0) {
- int difference = modulus;
- int low = config.getJournalFileSize() - difference;
- int high = low + journalFF.getAlignment();
- fileSize = difference < journalFF.getAlignment() / 2 ? low : high;
- 
ActiveMQServerLogger.LOGGER.invalidJournalFileSize(config.getJournalFileSize(), 
fileSize, journalFF.getAlignment());
+  if (fileSize <= journalFF.getAlignment()) {
--- End diff --

If `alignment` is a power of 2 (and it should be right?) you can use:
```
   public static long align(final long value, final long alignment) {
  return (value + (alignment - 1)) & ~(alignment - 1);
   }
```
That is into `BytesUtils::align`


---


[GitHub] activemq-artemis pull request #2474: [ARTEMIS-1536]: Incorrect Journal files...

2018-12-20 Thread ehsavoie
GitHub user ehsavoie opened a pull request:

https://github.com/apache/activemq-artemis/pull/2474

[ARTEMIS-1536]: Incorrect Journal filesize calculation where specified size 
is lest that the block size when using AIO.

* If the specified file size is under the fs block size then the resulting 
file size is 0. Setting it to the block size in this case.

Jira: https://issues.apache.org/jira/browse/ARTEMIS-1536

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ehsavoie/apache-activemq-artemis ARTEMIS-1536

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2474.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2474


commit 2e0e718d0c91906fbc0e9e0fa13ad85f7b58ca64
Author: Emmanuel Hugonnet 
Date:   2018-12-20T08:11:38Z

[ARTEMIS-1536]: Incorrect Journal filesize calculation where specified size 
is lest that the block size when using AIO.
* If the specified file size is under the fs block size then the
resulting file size is 0. Setting it to the block size in this case.

Jira: https://issues.apache.org/jira/browse/ARTEMIS-1536




---


[GitHub] activemq-artemis pull request #2473: ARTEMIS-1058 Jars in web tmp dir locked...

2018-12-19 Thread gaohoward
Github user gaohoward commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2473#discussion_r243139995
  
--- Diff: 
artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java
 ---
@@ -237,28 +236,39 @@ public void start() throws Exception {
public void internalStop() throws Exception {
   server.stop();
   if (webContexts != null) {
+
  File tmpdir = null;
+ StringBuilder strBuilder = new StringBuilder();
+ boolean found = false;
  for (WebAppContext context : webContexts) {
 tmpdir = context.getTempDirectory();
 
-if (tmpdir != null && !context.isPersistTempDirectory()) {
+if (tmpdir != null && tmpdir.exists() && 
!context.isPersistTempDirectory()) {
//tmpdir will be removed by deleteOnExit()
-   //somehow when broker is stopped and restarted quickly
-   //this tmpdir won't get deleted sometimes
-   boolean fileDeleted = TimeUtils.waitOnBoolean(false, 5000, 
tmpdir::exists);
-
-   if (!fileDeleted) {
-  //because the execution order of shutdown hooks are
-  //not determined, so it's possible that the deleteOnExit
-  //is executed after this hook, in that case we force a 
delete.
-  FileUtil.deleteDirectory(tmpdir);
-  logger.debug("Force to delete temporary file on 
shutdown: " + tmpdir.getAbsolutePath());
-  if (tmpdir.exists()) {
- ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir);
-  }
+   //However because the URLClassLoader never release/close 
its opened
+   //jars the jar file won't be able to get deleted on Windows 
platform
+   //until after the process fully terminated. To fix this 
here arranges
+   //a separate process to try clean up the temp dir
+   FileUtil.deleteDirectory(tmpdir);
+   if (tmpdir.exists()) {
+  ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir);
+  strBuilder.append(tmpdir);
+  strBuilder.append(",");
+  found = true;
}
 }
  }
+
+ if (found) {
+String artemisHome = System.getProperty("artemis.home");
--- End diff --

This "artemis.home" property is not defined in configured in broker.xml 
file. It's only defined in artemis.profile and set to system property in 
artemis.cmd script (-Dartemis.home="$ARTEMIS_HOME")
I can't get it from the other configurations.



---


[GitHub] activemq-artemis pull request #2473: ARTEMIS-1058 Jars in web tmp dir locked...

2018-12-19 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2473#discussion_r243083234
  
--- Diff: 
artemis-web/src/main/java/org/apache/activemq/artemis/component/WebServerComponent.java
 ---
@@ -237,28 +236,39 @@ public void start() throws Exception {
public void internalStop() throws Exception {
   server.stop();
   if (webContexts != null) {
+
  File tmpdir = null;
+ StringBuilder strBuilder = new StringBuilder();
+ boolean found = false;
  for (WebAppContext context : webContexts) {
 tmpdir = context.getTempDirectory();
 
-if (tmpdir != null && !context.isPersistTempDirectory()) {
+if (tmpdir != null && tmpdir.exists() && 
!context.isPersistTempDirectory()) {
//tmpdir will be removed by deleteOnExit()
-   //somehow when broker is stopped and restarted quickly
-   //this tmpdir won't get deleted sometimes
-   boolean fileDeleted = TimeUtils.waitOnBoolean(false, 5000, 
tmpdir::exists);
-
-   if (!fileDeleted) {
-  //because the execution order of shutdown hooks are
-  //not determined, so it's possible that the deleteOnExit
-  //is executed after this hook, in that case we force a 
delete.
-  FileUtil.deleteDirectory(tmpdir);
-  logger.debug("Force to delete temporary file on 
shutdown: " + tmpdir.getAbsolutePath());
-  if (tmpdir.exists()) {
- ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir);
-  }
+   //However because the URLClassLoader never release/close 
its opened
+   //jars the jar file won't be able to get deleted on Windows 
platform
+   //until after the process fully terminated. To fix this 
here arranges
+   //a separate process to try clean up the temp dir
+   FileUtil.deleteDirectory(tmpdir);
+   if (tmpdir.exists()) {
+  ActiveMQWebLogger.LOGGER.tmpFileNotDeleted(tmpdir);
+  strBuilder.append(tmpdir);
+  strBuilder.append(",");
+  found = true;
}
 }
  }
+
+ if (found) {
+String artemisHome = System.getProperty("artemis.home");
--- End diff --

can't you get the property from other means? Fileconfiguration for instance?


---


[GitHub] activemq-artemis pull request #2470: Fixes for alerts from lgtm.com

2018-12-19 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2470#discussion_r243076837
  
--- Diff: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
 ---
@@ -216,63 +216,63 @@ private static void printPages(DescribeJournal 
describeJournal,
 
  if (pgStore != null) {
 folder = pgStore.getFolder();
--- End diff --

The not-null check here on `pgStore` indicates that `pgStore` may, in fact, 
be null. If it is null then it will trigger a `NullPointerException` almost 
straight away.


---


[GitHub] activemq-artemis pull request #2470: Fixes for alerts from lgtm.com

2018-12-19 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2470#discussion_r243073035
  
--- Diff: 
artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/PrintData.java
 ---
@@ -216,63 +216,63 @@ private static void printPages(DescribeJournal 
describeJournal,
 
  if (pgStore != null) {
 folder = pgStore.getFolder();
--- End diff --

There's a semantic change here, are you sure?


---


[GitHub] activemq-artemis pull request #2464: ARTEMIS-1859 Incorrect routing with AMQ...

2018-12-19 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2464#discussion_r243026861
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 ---
@@ -216,9 +219,23 @@ public RemotingConnection getRemotingConnection() {
   flow();
}
 
-   public RoutingType getRoutingType(Receiver receiver, SimpleString 
address) {
+   public RoutingType getRoutingType(Receiver receiver, SimpleString 
address, AMQPMessage message) {
   org.apache.qpid.proton.amqp.messaging.Target target = 
(org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget();
-  return target != null ? getRoutingType(target.getCapabilities(), 
address) : getRoutingType((Symbol[]) null, address);
+  // the target may be null or have no capabilities in the case of an 
anonymous producer
+  if (target != null && target.getCapabilities() != null) {
--- End diff --

I'll rework this. Thanks for the feedback, @gemmellr.


---


<    1   2   3   4   5   6   7   8   9   10   >