[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246578304
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

if this was a collection, this would almost be throwing illegal 
arguement.


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --

cough ;) ... for the same comment you left me  :P


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246574781
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private s

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570293
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition 
pos) {
   return info;
}
 
+   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+  installTXCallback(tx, position, -1);
+   }
+
/**
 * @param tx
 * @param position
+* @param persistentSize if negative it needs to be calculated on the 
fly
 */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position, final long persistentSize) {
   if (position.getRecordID() >= 0) {
  // It needs to persist, otherwise the cursor will return to the 
fist page position
  tx.setContainsPersistent();
   }
 
   PageCursorInfo info = getPageInfo(position);
   PageCache cache = info.getCache();
-  long size = 0;
   if (cache != null) {
- size = 
getPersistentSize(cache.getMessage(position.getMessageNr()));
+ final long size;
+ if (persistentSize < 0) {
--- End diff --

surely this is checking for something like if its -1? not just that its 
negative which would be worrying if so this should be more explicit to just 
be checking -1, and if anything else thats negative, means illegal argument


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569523
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568743
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

surely if this occurs, theres some issue..


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568579
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
--- End diff --

naming of fields is missaligned, some places its called size, some others 
you call it index, this is hard to follow.


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246566042
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246563763
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

@franz1981 Id agree with @clebertsuconic here, a bit like what ive done 
with priority consumers, i ended up splitting out the collections logic, which 
has ended up making it cleaner, and easier to reason with. (and as youve marked 
out on its pr, more testable ;) )


---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@franz1981 pushed following changes based on your comments that agree with, 
for others i have left comment for us to discuss. 

Changes:
1) Ensured total size of the priority collection can never exceed 
Integer.MAX_VALUE, by ensuring this on add, thus the edge case you were worried 
about of the calcSize being greater than int cannot occur. Note if someone has 
that many consumers, we probably want to have some discussions with them as 
they would be some power user!!! ;)

2)  Avoid double volatile read of changedIterator in reset method.

3) Removed need for a cast in MultiResettableIterator


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246558298
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

currentIterator is ONLY updated by reset, which should ONLY be called by 
the same threads operating on the ResettableIterator interface.

When add or remove of consumer occurs a new iterator is parked into the 
volatile changedIterator (using an atomic field updater), so the next reset can 
pick it up. 



---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@gemmellr pushed changes based on latest comments, thanks.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246555304
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

see other comment


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246525755
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

The compare and set is because the new iterator could be created whilst we 
are mid resetting. The iterator is single thread usage only. Updating the 
collection should be thread safe and able to occur in parralell.

This is much like iterating copyonwriteareatlist. Iterator works on a 
snapshot view and this is single thread. Updating the collection itself can be 
concurrent and occur whilst someone else is iterating 


---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@franz1981 good point around adding further tests to the extracted out 
bits. Agree it will make everything more robust


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246523218
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
--- End diff --

Agreed


---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@gemmellr ok will reduce back down the extra tests i added. To those you 
suggest. 

Also re activemq5. Fair enough, ill reduce / alter this then.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246521520
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
--- End diff --

Yeah i had a version like that but it makes using simple MultiIterator ugly 
to use. Uneededly. And to extend if needed only needs a cast. In case of 
resetable.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520970
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators 
are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to 
another iterator it resets it.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiResettableIterator extends MultiIterator 
implements ResettableIterator {
--- End diff --

You could but this then makes using MutiIterator ugly as hell with little 
gain


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520411
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private s

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520138
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private s

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520121
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private s

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518747
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
--- End diff --

Nice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518669
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
   this.filterString = filterString;
}
 
+   public void setPriority(byte priority) {
--- End diff --

Keeping inline with the rest of the fields and general approach. Id rather 
keep to status quo


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518313
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

If it fails it means whilst we were reseting and changing reference to the 
new iterator. a consumer was added or removed and another new iterator was 
created (as this can occur whilsy we are iterating or resetting.) And means 
simply whilst this reset updated its iterator, on next reset it needs to switch 
over its again. We wouldnt want to lazy set to null 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516870
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

Like any iterator and iterator should only be interacted by one thread at a 
time.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516479
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
+  }
+  return result;
+   }
+
+   @Override
+   public boolean remove(T t) {
+  boolean result = consumers.remove(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

Agreed. But this is not hot path, and if anything would allow in future for 
consumers to be concurrently added or removed without the existing bit sync 
blocks there is today in queueimpl


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515810
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2409,14 +2410,10 @@ private void deliver() {
   // Either the iterator is empty or the consumer is busy
   int noDelivery = 0;
 
-  int size = 0;
-
-  int endPos = -1;
-
   int handled = 0;
 
   long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
-
+  consumers.reset();
--- End diff --

Its fine. We actually only want to reset on a succesful handled nomatch or 
expired. Or before starting iterating


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515455
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2484,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
-
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

Yes its fine. Would have no negative effect, and actually have same 
behaviour as old. 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246513536
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private s

[GitHub] activemq-artemis issue #2492: ARTEMIS-2222 why the position remains unchange...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2492
  
Point being if this is a bug, please can there be a test that exposes the 
issues, so a) this can be reviewed b) we can validate the fix c) ensure there 
is no regression.


---


[GitHub] activemq-artemis issue #2445: ARTEMIS-2187 remove page from softcache before...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2445
  
Can we have some tests here please, a) to show the issue ensuring the fix 
does fix it, and b) to ensure no regression.


---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@gemmellr thanks for review, if you could recheck the AMQPSessionCallback 
for me, to make sure i understood you. 

As for the Openwire test case, this was a simple port over of the existing 
activemq5 test case as untouched as possible, i agree we could reduce the time 
but id rather (at least for this release) keep it the same, so we can be sure 
feature works for openwire same as activemq5.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140306
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
+ assertNotNull("did not receive message first time", message1);
+ assertEquals("MessageID:" + i, message1.getMessageId());
+ message1.accept();
+ assertNull("message is not meant to goto lower priority 
receiver", message2);
+ assertNull("message is not meant to goto lower priority 
receiver", message3);
+  }
+
+  //Close the high priority receiver
+  receiver1.close();
+
+  sendMessages(getQueueName(), 5);
+
+  //Check messages now goto next priority receiver
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait()


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140214
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

changed up the ordering in this test also.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140041
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135818
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

now using getName ... again nice nice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135697
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Have changed if you can recheck to make sure i understood.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135542
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

done


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246133832
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

Nice i didnt know about that in the parent class. will change to use this..


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246132135
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

this is the original test from ActiveMQ5 i was trying to keep this test as 
much un-touched as possible to ensure behavior is the same.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128551
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

This is actually tested on the queueconsumerimpl test. But agree we can do 
same here


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128143
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

This is typical pattern used for adding safely a new field that can be 
either nullable or defaultable. Used many times over.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246127864
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

Makes sense


---


[GitHub] activemq-artemis issue #2492: ARTEMIS-2222 why the position remains unchange...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2492
  
This had a specific purpose so we ignore deleted or completed pages.

As per comment in code.

// any deleted or complete page will be ignored on the 
moveNextPage, we will just keep going


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r245960681
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

Did you look over this? 


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@qihongxu big thanks for all the effort on this!!


---


[GitHub] activemq-artemis issue #2489: ARTEMIS-2220 Fix PageCursorStressTest::testSim...

2019-01-07 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2489
  
@franz1981 LGTM then :) merge...merge...merge


---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-07 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@franz1981 great thanks a million :)


---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-07 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@franz1981 did you get a chance to look, do you think this is better than 
original solution? 

Am keen to get this feature into the next release cut.


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-07 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@franz1981 i think well have to do some more real world testing with it 
(difference between a isolated bench and a full e2e test), with @qihongxu help 
hopefully, it might be something odd that by releasing some performance it 
causes an odd bottleneck somewhere else.

Are you ok, if once full CI passes we merge as is, and continue this on a 
separate pr?


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-07 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@franz1981 based on this, shall we merge this pr as is which is quite 
impressive result at a combined 49.4k.

And then work on livepagecache changes separately? 





---


[GitHub] activemq-artemis issue #2489: ARTEMIS-2220 Fix PageCursorStressTest::testSim...

2019-01-07 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2489
  
@franz1981 why not fix up FakeQueue to correctly set it


---


[GitHub] activemq-artemis issue #2490: V2 196

2019-01-07 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@franz1981 just ignore class comments, theyre the originals still, ill need 
to change, but wanted to get to you quickly so you have chance to look over. If 
you think this is better ill make final tidyup bits, such as class comments and 
replace the real PR's branch. 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-07 Thread michaelandrepearce
GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/activemq-artemis/pull/2490

V2 196

@franz1981 an alternative so we don't have to have a copy of 
CopyOnWriteArrayList, it does mean on add or remove consumer we have to invoke 
toArray which causes a copy, but this is not on hot path, so i think we should 
be good, and avoids us having to clone a jvm class.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelandrepearce/activemq-artemis V2-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2490


commit d731ffe7288cb857fef1b97deff4b7dc18aeb6d7
Author: Michael André Pearce 
Date:   2018-12-31T13:22:02Z

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs

commit b0c775840fc98b5d3f5f3485802de3270c614d9a
Author: Michael André Pearce 
Date:   2019-01-05T09:48:24Z

Extract




---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@franz1981 did you send a pr to @qihongxu branch so he can merge it and 
this pr picks it up?

Be great to see a final stat in @qihongxu test env


---


[GitHub] activemq-artemis issue #2482: ARTEMIS-2214 Cache durable in Pag...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2482
  
@franz1981 im away in another country without my main computer with my 
apache git ssh cert key. Could you merge this?


---


[GitHub] activemq-artemis issue #2482: ARTEMIS-2214 Cache durable in Pag...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2482
  
@qihongxu looks good to me now.


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@qihongxu looks good to me.

@franz1981 whats the current PR look like in heat maps?


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> em.Could you please tell us which issues? We need to verify how it 
affects our cluster.

The big issue im relating to, which became a night mare for my 
organisation, was that under high concurrency (high throughput and low latency 
broker setup), the buffers can get mixed up, and was causing index out of 
bounds issues.

Fixes were multiple:

https://github.com/apache/activemq-artemis/commit/024db5bd3c1656d265daf60c9e3a362d53b9088b

https://github.com/apache/activemq-artemis/commit/da7fb89037481fb6343c760010d4553ff28ac87e

Im also aware there have been some other concurrency fixes for smaller 
issues.



---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245239811
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

@wy96f what @franz1981 is trying to say, is we can do the volatile read 
just once, by adding one line e.g.


 AddressFullMessagePolicy addressFullMessagePolicy = 
this.addressFullMessagePolicy;
 if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
return false;
 }
 if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
return isFull();
 }
 if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
return isFull();
 }
 return paging;


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> @michaelandrepearce
> 
> > Do you get this on master or this PR (im mean is that a typo)?
> 
> I've got that on master!

ok so i think we not need worry on this for realms of this PR. (probably 
needs looking into, but doesn;t need to be solved in this pr - imo) 


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> If we return `true` from the dirty read we can just return it, while if 
we found the it `false` we could attempt to enter the read lock and validate 
that's not paging for real.

Ive literally gone through every case, what occurs is we call isPaging 
within an if statement, and then do some logic after, as such anyhow any action 
we do within these if statements anyhow will be based off a stale state. 

Im starting to just think we make isPaging not use a read lock  (aka make 
it dirty), as its only used in queueimpl like mentioned and for queuecontrol 
(aka the admin gui)


---


[GitHub] activemq-artemis issue #2470: Fixes for alerts from lgtm.com

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2470
  
+1 from me, nice! 


---


[GitHub] activemq-artemis issue #2477: ARTEMIS-2190 move tests

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2477
  
LGTM - would merge for you but currently abroad and don;t have access to my 
computer with my apache git ssh key, feel free to merge.


---


[GitHub] activemq-artemis issue #2481: ARTEMIS-2213 don't expire critical component i...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2481
  
@wy96f if you could close, only the person opening can close, or when we 
merge.


---


[GitHub] activemq-artemis pull request #2482: ARTEMIS-2214 Cache durable.

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2482#discussion_r245233815
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
 ---
@@ -120,14 +126,16 @@ public PagedReferenceImpl(final PagePosition position,
  this.largeMessage = message.getMessage().isLargeMessage() ? 
IS_LARGE_MESSAGE : IS_NOT_LARGE_MESSAGE;
  this.transactionID = message.getTransactionID();
  this.messageID = message.getMessage().getMessageID();
-
+ this.durable = message.getMessage().isDurable() ? IS_DURABLE : 
IS_NOT_DURABLE;
+ this.deliveryTime = 
message.getMessage().getScheduledDeliveryTime();
  //pre-cache the message size so we don't have to reload the 
message later if it is GC'd
  getPersistentSize();
   } else {
  this.largeMessage = UNDEFINED_IS_LARGE_MESSAGE;
  this.transactionID = -2;
  this.messageID = -1;
  this.messageSize = -1;
+ this.durable = UNDEFINED_IS_DURABLE;
--- End diff --

for completeness (its a nit) set deliveryTime to its undefined value here.


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-04 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> @michaelandrepearce @qihongxu Just a lil OT but I'm getting these warns 
on master:
> 
> ```
> 2019-01-03 17:36:44,408 WARN  [org.apache.activemq.artemis.journal] 
AMQ142007: Can not find record 8,103,136 during compact replay
> 2019-01-03 17:36:44,408 WARN  [org.apache.activemq.artemis.journal] 
AMQ142007: Can not find record 8,103,139 during compact replay
> 2019-01-03 17:36:44,408 WARN  [org.apache.activemq.artemis.journal] 
AMQ142007: Can not find record 8,103,142 during compact replay
> 2019-01-03 17:36:44,408 WARN  [org.apache.activemq.artemis.journal] 
AMQ142007: Can not find record 8,103,145 during compact replay
> 2019-01-03 17:36:44,408 WARN  [org.apache.activemq.artemis.journal] 
AMQ142007: Can not find record 8,103,148 during compact replay
> 2019-01-03 17:36:44,408 WARN  [org.apache.activemq.artemis.journal] 
AMQ142007: Can not find record 8,103,151 during compact replay
> 2019-01-03 17:36:44,408 WARN  [org.apache.activemq.artemis.journal] 
AMQ142007: Can not find record 8,103,154 during compact replay
> ```

Do you get this on master? If not then that is a BIG worry


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> Looking at the CPU graph instead I can see many odd things ie Compaction 
is stealing lot of cpu and I/O:

Nice find.


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@franz1981 nice graphs, looks like essentially isPaging in QueueImpl unless 
everything is dirtyRead then we simply move the problem somewhere else, i think 
we probably need to make then a more general call if all these are safe to have 
dirty read or not. (Im starting to feel like  


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> > > CursorIterator:hasNext
> > 
> > 
> > Im bit concerned with this doing a dirty read, as this isnt something 
that is trigger an ascyn action, in actual fact the hasNext is purposefully 
synchronized. (especially as recently some concurrency issues have been found 
in paging (e.g. howards current pr) im sensitive to us being too hap hazard 
here.
> 
> I don't get why isPaging() in hasNext() needs to be consistent. The 
paging status can change after isPaging() unless we readlock isPaging() and 
subsequent operations. Without readlock, a) if isPaging() returns true, but the 
other thread set it to false, subsequent next() call reads no data and returns 
null; b) if isPaging() returns false, but the other thread set it to true(a new 
message coming), deliverAsync would be called later anyway.

if thats the case then the sync method could be removed also, though i 
think @clebertsuconic has given some further info to why this is like it is.


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
So out the two, the checkDepage is the safer one to use a dirty read, so id 
expect to see that changed first before anything else.


---


[GitHub] activemq-artemis issue #2482: ARTEMIS-2214 Cache durable in PagedRe...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2482
  
Yes you could add this to set in the section within the constructor where 
message != null, seems sensible.


---


[GitHub] activemq-artemis issue #2482: ARTEMIS-2214 Cache durable in PagedRe...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2482
  
Im very cautious of optimising for one case
 Its very unusual for consumers to go away as it is good design in most mom 
a consumer is long lived.

So without it removed im -1. As i stated in an ideal world everything in 
paging would be off heap. 


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@qihongxu i dont see checkDepage using the dirtyRead in current commit


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> The version I have shown is just master ie any read lock is there!

I get that, im more relfecting on this change in PR.


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245095523
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -1350,7 +1350,7 @@ public synchronized boolean hasNext() {
 return true;
  }
 
- if (!pageStore.isPaging()) {
+ if (!pageStore.isPagingDirtyRead()) {
--- End diff --

Concern here is this ins't an async case.

Btw i cannot see the change in QueueImpl to use the new paging dirty read.


---


[GitHub] activemq-artemis pull request #2484: ARTEMIS-2216 Use a specific executor fo...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2484#discussion_r245094809
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 ---
@@ -278,21 +293,26 @@ public boolean isPaging() {
   lock.readLock().lock();
 
   try {
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
-return false;
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
-return isFull();
- }
- if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP) {
-return isFull();
- }
- return paging;
+ return isPagingDirtyRead();
   } finally {
  lock.readLock().unlock();
   }
}
 
+   @Override
+   public boolean isPagingDirtyRead() {
+  if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
--- End diff --

nice idea!


---


[GitHub] activemq-artemis issue #2479: ARTEMIS-2211 Avoid duplicate code for ByteBuff...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2479
  
@franz1981 this is out my league on this, im not so familiar with journal 
bits/logic, you ok with me leaving this for @clebertsuconic ?


---


[GitHub] activemq-artemis issue #2480: ARTEMIS-2212 Avoid using CLQ on ServerConsumer...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2480
  
@franz1981, did you make the change to syncronized methods? As this PR is 
about performance im for getting the most we can in one ;)


---


[GitHub] activemq-artemis issue #2481: ARTEMIS-2213 don't expire critical component i...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2481
  
Can we close this for the interim?


---


[GitHub] activemq-artemis issue #2483: ARTEMIS-2215 largemessage have been consumed b...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2483
  
Im less familar with the journal stuff in general from what i can tell 
though it looks good, so will let @franz1981 give a thumbs up on the logic 
change, but from better use of collection libs +1 one me.


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2483#discussion_r245090041
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalStorageManager.java
 ---
@@ -309,16 +309,17 @@ public void run() {
 */
@Override
protected void performCachedLargeMessageDeletes() {
-  for (Long largeMsgId : largeMessagesToDelete) {
- SequentialFile msg = createFileForLargeMessage(largeMsgId, 
LargeMessageExtension.DURABLE);
+  for (LargeServerMessage largeServerMessage : 
largeMessagesToDelete.values()) {
--- End diff --

Usage of LongConcurrentHashMap  looks much better.


---


[GitHub] activemq-artemis issue #2482: ARTEMIS-2214 Cache durable in PagedRe...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2482
  
@qihongxu nudge


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> CursorIterator:hasNext

Im bit concerned with this doing a dirty read, as this isnt something that 
is trigger an ascyn action, in actual fact the hasNext is purposefully 
synchronized. 

@franz1981 if checkDepage is removed from the lock, why would DepageRunner 
now be locking it up so bad? 


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
> > Instead of transaction consumer you could use client acknowledge or 
even individual acknowledge.
> 
> It seems that both client-acknowledge and individual-acknowledge mode 
will finally use Transaction at server side. Considering these modes have no 
obvious difference in performance, we choose to use transaction as it’s more 
reliable and supports rollback.

@qihongxu you're using JMS api not core then?


---


[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2488
  
Cool so will leave as is then. thanks for the review!


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245030347
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMis

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029912
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -25,20 +26,24 @@
 
private SimpleString filterString;
 
+   private int priority;
--- End diff --

marking resolved.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029854
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Iterator;
+
+public interface ResetableIterator extends Iterator {
+
+   /**
+* Resets the iterator so you can re-iterate over all elements.
+*
+* @return itself, this is just for convenience.
+*/
+   ResetableIterator reset();
--- End diff --

ill mark resolved then.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029745
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2494,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
-
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

resolving as discussed else where


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029528
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMis

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2488
  
@franz1981 is that java 8? I thought it was Java 9+


---


[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2488
  
Reflection would not be good here, getArray is HOT path, its reason why i 
need it.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245013090
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMis

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245011975
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java
 ---
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Iterator;
+
+public interface ResetableIterator extends Iterator {
+
+   /**
+* Resets the iterator so you can re-iterate over all elements.
+*
+* @return itself, this is just for convenience.
+*/
+   ResetableIterator reset();
--- End diff --

-1 we are not closing the iterator, nor would this go in a try resources 
block., we are simply resetting the iterator so it marks the endpos = startpos, 
so we continue to round robin, as successfully handled a message.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245012214
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,648 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.server.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the consumers, it models around multi 
getPriority (getPriority) varient of
+ * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent 
safe and non blocking.
+ *
+ * N.b. we could have made Level extend CopyOnWriteArrayList but due to 
the need to access the internal Array structure,
+ * which is privileged to package java.util.concurrent. As such much of 
Level is is taken from here.
+ *
+ * Modifications like in CopyOnWriteArrayList are single threaded via a 
single re-entrant lock.
+ *
+ * Iterators iterate over a snapshot of the internal array structure, so 
will not see mutations.
+ *
+ * There can only be one resettable iterable view, this is exposed at the 
top getPriority,
+ * and is intended for use in QueueImpl only.
+ * All other iterators are not reset-able and are created on calling 
iterator().
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl extends 
AbstractCollection implements QueueConsumers {
+
+   private final QueueConsumersIterator iterator = new 
QueueConsumersIterator<>(this, true);
+
+   private volatile Level[] levels;
+   private volatile int size;
+   private volatile T first;
+
+   private void setArray(Level[] array) {
+  this.levels = array;
+   }
+
+   private Level[] getArray() {
+  return levels;
+   }
+
+
+   public QueueConsumersImpl() {
+  levels = newLevelArrayInstance(0);
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Level[] newLevelArrayInstance(int length) {
+  return (Level[]) Array.newInstance(Level.class, length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   @Override
+   public Set getPriorites() {
+  Level[] levels = getArray();
+  return 
Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  return new QueueConsumersIterator<>(this, false);
+   }
+
+   @Override
+   public boolean hasNext() {
+  return iterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return iterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  iterator.reset();
+  return this;
+   }
+
+   @Override
+   public void forEach(Consumer action) {
+  Objects.requireNonNull(action);
+  Level[] current = getArray();
+  int len = current.length;
+  for (int i = 0; i < len; ++i) {
+ current[i].forEach(action);
+  }
+   }
+
+   private Level getLevel(int level, boolean createIfMis

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2488
  
@franz1981 alot of comments where why integer vs byte, whilst legacy 
ActiveMQ only supported 0-127 in open wire, many other brokers support integer 
for this feature, e.g. QPID for AMQP supports -2^31 to 2^31-1 like wise 
rabbitmq.

This is set on the consumer (its not per message) so size isn't an issue it 
makes sense to support the int and not constrain our selves un-neededly, making 
people migrating AMQP easier.


---


[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

2019-01-03 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009418
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) {
   buffer.writeNullableSimpleString(filterString);
   buffer.writeBoolean(browseOnly);
   buffer.writeBoolean(requiresResponse);
+  buffer.writeInt(priority);
--- End diff --

we plan to support int.


---


  1   2   3   4   5   6   7   8   9   10   >