[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602226
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
 ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+
+/**
+ * This collection is a concurrent append-only list that grows in 
chunks.
+ * It's safe to be used by many threads concurrently and has a max 
capacity of {@link Integer#MAX_VALUE}.
+ */
+public final class ConcurrentAppendOnlyChunkedList {
--- End diff --

Thanks to you guys and one day I will be better in naming things too I 
swear :D


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246602031
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

Is not that easy here, this one is not a common collection (in theory, but 
we can get an agreement on the contract for sure): it is more an append only 
list that allows indexed queries like a map.
If I remember correctly it is similar to an [hashed array 
tree](https://en.wikipedia.org/wiki/Hashed_array_tree), where the top-level 
directory is a double linked list of "folders" (instead of an array, like the 
original implementation): indeed in the code there are chunkIndex (==key of top 
level directory) and offset (==key of leaf into a directory).


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601217
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/ConcurrentAppendOnlyChunkedList.java
 ---
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.IntFunction;
+
+/**
+ * This collection is a concurrent append-only list that grows in 
chunks.
+ * It's safe to be used by many threads concurrently and has a max 
capacity of {@link Integer#MAX_VALUE}.
+ */
+public final class ConcurrentAppendOnlyChunkedList {
--- End diff --

This is a lot better!


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246601169
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r24662
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

@clebertsuconic @michaelandrepearce Good points guys: i have re-implemented 
the logic in a proper collection (but that won't implement canonical 
`Collection` types, because it is not canonical at all)


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246599701
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --

eheh you're right :P, but the code of this collection was coming from a 
version where `chunkSize` was not a static final constant: in the new version 
is more clear why I have done it


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246578304
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

if this was a collection, this would almost be throwing illegal 
arguement.


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246577156
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
--- End diff --

cough ;) ... for the same comment you left me  :P


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246574781
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246574775
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570552
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition 
pos) {
   return info;
}
 
+   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+  installTXCallback(tx, position, -1);
+   }
+
/**
 * @param tx
 * @param position
+* @param persistentSize if negative it needs to be calculated on the 
fly
 */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position, final long persistentSize) {
   if (position.getRecordID() >= 0) {
  // It needs to persist, otherwise the cursor will return to the 
fist page position
  tx.setContainsPersistent();
   }
 
   PageCursorInfo info = getPageInfo(position);
   PageCache cache = info.getCache();
-  long size = 0;
   if (cache != null) {
- size = 
getPersistentSize(cache.getMessage(position.getMessageNr()));
+ final long size;
+ if (persistentSize < 0) {
--- End diff --

-1 is used as a reserved value in another point to trigger the cache lookup


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246570293
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -864,21 +875,32 @@ private PageCursorInfo processACK(final PagePosition 
pos) {
   return info;
}
 
+   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+  installTXCallback(tx, position, -1);
+   }
+
/**
 * @param tx
 * @param position
+* @param persistentSize if negative it needs to be calculated on the 
fly
 */
-   private void installTXCallback(final Transaction tx, final PagePosition 
position) {
+   private void installTXCallback(final Transaction tx, final PagePosition 
position, final long persistentSize) {
   if (position.getRecordID() >= 0) {
  // It needs to persist, otherwise the cursor will return to the 
fist page position
  tx.setContainsPersistent();
   }
 
   PageCursorInfo info = getPageInfo(position);
   PageCache cache = info.getCache();
-  long size = 0;
   if (cache != null) {
- size = 
getPersistentSize(cache.getMessage(position.getMessageNr()));
+ final long size;
+ if (persistentSize < 0) {
--- End diff --

surely this is checking for something like if its -1? not just that its 
negative which would be worrying if so this should be more explicit to just 
be checking -1, and if anything else thats negative, means illegal argument


---


[GitHub] activemq-artemis pull request #2483: ARTEMIS-2215 largemessage have been con...

2019-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2483


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569626
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

I know but in the original version it was handled in that way and it covers 
2 case: the collection is null and the collection hasn't enough element


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246569523
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568743
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
--- End diff --

surely if this occurs, theres some issue..


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568579
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
--- End diff --

naming of fields is missaligned, some places its called size, some others 
you call it index, this is hard to follow.


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568552
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246568367
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage message) {

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567995
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246567905
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

Makes sense :+1: 


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246566042
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -48,54 +82,228 @@ public long getPageId() {
}
 
@Override
-   public synchronized int getNumberOfMessages() {
-  return messages.size();
+   public int getNumberOfMessages() {
+  while (true) {
+ final long size = producerIndex;
+ if (size == RESIZING) {
+Thread.yield();
+continue;
+ }
+ return (int) Math.min(size, Integer.MAX_VALUE);
+  }
}
 
@Override
-   public synchronized void setMessages(PagedMessage[] messages) {
+   public void setMessages(PagedMessage[] messages) {
   // This method shouldn't be called on liveCache, but we will provide 
the implementation for it anyway
   for (PagedMessage msg : messages) {
  addLiveMessage(msg);
   }
}
 
@Override
-   public synchronized PagedMessage getMessage(int messageNumber) {
-  if (messageNumber < messages.size()) {
- return messages.get(messageNumber);
-  } else {
+   public PagedMessage getMessage(int messageNumber) {
+  if (messageNumber < 0) {
  return null;
   }
+  //it allow to perform less cache invalidations vs producerIndex if 
there are bursts of appends
+  long size = lastSeenProducerIndex;
+  if (messageNumber >= size) {
+ while ((size = producerIndex) == RESIZING) {
+Thread.yield();
+ }
+ //it is a message over the current size?
+ if (messageNumber >= size) {
+return null;
+ }
+ //publish it for others consumers
+ LAST_PRODUCER_INDEX_UPDATER.lazySet(this, size);
+  }
+  final AtomicChunk buffer;
+  final int offset;
+  if (messageNumber >= chunkSize) {
+ offset = messageNumber & chunkMask;
+ //slow path is moved in a separate method
+ buffer = jump(messageNumber, size);
+  } else {
+ offset = messageNumber;
+ buffer = consumerBuffer;
+  }
+  //NOTE: producerIndex is being updated before setting a new value ie 
on consumer side need to spin until a not null value is set
+  PagedMessage msg;
+  while ((msg = buffer.get(offset)) == null) {
+ Thread.yield();
+  }
+  return msg;
+   }
+
+   /**
+* Implements a lock-free version of the optimization used on {@link 
java.util.LinkedList#get(int)} to speed up queries
+* ie backward search of a node if needed.
+*/
+   private AtomicChunk jump(final int messageNumber, final 
long size) {
+  //fast division by a power of 2
+  final int jumps = messageNumber >> chunkSizeLog2;
+  //size is never allowed to be > Integer.MAX_VALUE
+  final int lastChunkIndex = (int) size >> chunkSizeLog2;
+  int requiredJumps = jumps;
+  AtomicChunk jumpBuffer = null;
+  boolean jumpForward = true;
+  int distanceFromLastChunkIndex = lastChunkIndex - jumps;
+  //it's worth to go backward from lastChunkIndex?
+  //trying first to check against the value we already have: if it 
won't worth, won't make sense to load the producerBuffer
+  if (distanceFromLastChunkIndex < jumps) {
+ final AtomicChunk producer = producerBuffer;
+ //producer is a potential moving, always increasing, target ie 
better to re-check the distance
+ distanceFromLastChunkIndex = producer.index - jumps;
+ if (distanceFromLastChunkIndex < jumps) {
+//we're saving some jumps ie is fine to go backward from here
+jumpBuffer = producer;
+requiredJumps = distanceFromLastChunkIndex;
+jumpForward = false;
+ }
+  }
+  //start from the consumer buffer only is needed
+  if (jumpBuffer == null) {
+ jumpBuffer = consumerBuffer;
+  }
+  for (int i = 0; i < requiredJumps; i++) {
+ //next chunk is always set if below a read producerIndex value
+ //previous chunk is final and can be safely read
+ jumpBuffer = jumpForward ? jumpBuffer.next : jumpBuffer.prev;
+  }
+  return jumpBuffer;
}
 
@Override
-   public synchronized boolean isLive() {
+   public boolean isLive() {
   return isLive;
}
 
@Override
-   public synchronized void addLiveMessage(PagedMessage message) {
+   public void addLiveMessage(PagedMessage 

[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246563763
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

@franz1981 Id agree with @clebertsuconic here, a bit like what ive done 
with priority consumers, i ended up splitting out the collections logic, which 
has ended up making it cleaner, and easier to reason with. (and as youve marked 
out on its pr, more testable ;) )


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2019-01-09 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r246563323
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java
 ---
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class MQTTQueueCleanTest extends MQTTTestSupport {
+
+   private static final ActiveMQServerLogger log = 
ActiveMQServerLogger.LOGGER;
+
+   @Test
+   public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() 
throws Exception {
+  Random random = new Random();
+  Set clientProviders = new HashSet<>(11);
+  int repeatCount = 0;
+  String address = "clean/test";
+  String clientId = "sameClientId";
+  String queueName = "::sameClientId.clean.test";
+  //The abnormal scene does not necessarily occur, repeating 100 times 
to ensure the recurrence of the abnormality
+  while (repeatCount < 100) {
+ repeatCount++;
+ int subConnectionCount = random.nextInt(50) + 1;
+ int sC = 0;
+ try {
+//Reconnect at least twice to reproduce the problem
+while (sC < subConnectionCount) {
+   sC++;
+   MQTTClientProvider clientProvider = getMQTTClientProvider();
+   clientProvider.setClientId(clientId);
+   initializeConnection(clientProvider);
+   clientProviders.add(clientProvider);
+   clientProvider.subscribe(address, AT_LEAST_ONCE);
+}
+ } finally {
+for (MQTTClientProvider clientProvider : clientProviders) {
+   clientProvider.disconnect();
+}
+clientProviders.clear();
+assertTrue(waitForBindings(server, queueName, false, 0, 0, 
1));
--- End diff --

From what I can tell the overridden version of `waitForBindings` isn't 
necessary. You could just use something like:

`assertTrue(Wait.waitFor(() -> 
server.locateQueue(SimpleString.toSimpleString(queueName)) == null));`


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2019-01-09 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r246562512
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 ---
@@ -3604,4 +3605,14 @@ private void 
deployReloadableConfigFromConfiguration() throws Exception {
   return externalComponents;
}
 
+   @Override
+   public Set queueConsumersQuery(SimpleString queueName) {
--- End diff --

This method isn't necessary. You can use the `locateQueue` method and 
simply invoke `getConsumers` on the returned value.


---


[GitHub] activemq-artemis pull request #2489: ARTEMIS-2220 Fix PageCursorStressTest::...

2019-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2489


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2491


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2494#discussion_r246558406
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/LivePageCacheImpl.java
 ---
@@ -28,15 +28,49 @@
 /**
  * This is the same as PageCache, however this is for the page that's 
being currently written.
  */
--- End diff --

Isn't this mixing the collection implementation itself into the 
LivePageCache?

isn't there a way to implement this logic into its own structure? Like 
PageCache using a generic ChunkArray (a name I just came up here)?

I'm a bit concerned on maintaining the business side of this issue (that is 
the PageCache) with the speedy implementation of a collection.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246558298
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

currentIterator is ONLY updated by reset, which should ONLY be called by 
the same threads operating on the ResettableIterator interface.

When add or remove of consumer occurs a new iterator is parked into the 
volatile changedIterator (using an atomic field updater), so the next reset can 
pick it up. 



---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246557671
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246555304
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

see other comment


---


[GitHub] activemq-artemis pull request #2492: ARTEMIS-2222 why the position remains u...

2019-01-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2492


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246551345
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

Tomorrow I will give another look but it doesn't seems correct to me if 
this code can be run by different threads: currentIterator is not guarded by 
any atomic operations to be safely published and changedIterator is volatile 
load 3 times before hitting the case...


---


[GitHub] activemq-artemis pull request #2492: ARTEMIS-2222 why the position remains u...

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2492#discussion_r246538095
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
 ---
@@ -1315,9 +1315,7 @@ private PagedReference moveNext() {
   }
}
 
-   if (!ignored) {
-  position = message.getPosition();
-   }
+   position = message.getPosition();
--- End diff --

I think this made sense at some point, but after a few fixes later it's 
harmless.

I don't think this needs a JIRA as there's no issue I think.

I will merge and keep the JIRA, but I will rename the commit from being a 
question into something more affirmative.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246525755
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

The compare and set is because the new iterator could be created whilst we 
are mid resetting. The iterator is single thread usage only. Updating the 
collection should be thread safe and able to occur in parralell.

This is much like iterating copyonwriteareatlist. Iterator works on a 
snapshot view and this is single thread. Updating the collection itself can be 
concurrent and occur whilst someone else is iterating 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246523218
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
--- End diff --

Agreed


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246521520
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
--- End diff --

Yeah i had a version like that but it makes using simple MultiIterator ugly 
to use. Uneededly. And to extend if needed only needs a cast. In case of 
resetable.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520970
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators 
are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to 
another iterator it resets it.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiResettableIterator extends MultiIterator 
implements ResettableIterator {
--- End diff --

You could but this then makes using MutiIterator ugly as hell with little 
gain


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520411
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520138
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520110
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

So it is a good reason to save using a `compareAndSet` that's not only 
expensive but give the false illusion that the code is thread-safe


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520121
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518747
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
--- End diff --

Nice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518669
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
   this.filterString = filterString;
}
 
+   public void setPriority(byte priority) {
--- End diff --

Keeping inline with the rest of the fields and general approach. Id rather 
keep to status quo


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518313
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

If it fails it means whilst we were reseting and changing reference to the 
new iterator. a consumer was added or removed and another new iterator was 
created (as this can occur whilsy we are iterating or resetting.) And means 
simply whilst this reset updated its iterator, on next reset it needs to switch 
over its again. We wouldnt want to lazy set to null 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516870
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

Like any iterator and iterator should only be interacted by one thread at a 
time.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516479
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
+  }
+  return result;
+   }
+
+   @Override
+   public boolean remove(T t) {
+  boolean result = consumers.remove(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

Agreed. But this is not hot path, and if anything would allow in future for 
consumers to be concurrently added or removed without the existing bit sync 
blocks there is today in queueimpl


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515810
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2409,14 +2410,10 @@ private void deliver() {
   // Either the iterator is empty or the consumer is busy
   int noDelivery = 0;
 
-  int size = 0;
-
-  int endPos = -1;
-
   int handled = 0;
 
   long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
-
+  consumers.reset();
--- End diff --

Its fine. We actually only want to reset on a succesful handled nomatch or 
expired. Or before starting iterating


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515455
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2484,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
-
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

Yes its fine. Would have no negative effect, and actually have same 
behaviour as old. 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246513536
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #:

2019-01-09 Thread clebertsuconic
Github user clebertsuconic commented on the pull request:


https://github.com/apache/activemq-artemis/commit/0f9bf15788096ad9dc795954174c3b496861932c#commitcomment-31886537
  
In 
artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java:
In 
artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
 on line 347:
@andytaylor I had a lot of headache here. Weeks testing this while sitting 
close to a production/test environment. I would double check this carefully. 
This kind of thing is hard to be tested.


---


[GitHub] activemq-artemis pull request #2485: ARTEMIS-2217 ‘MQTTSessionState’ in ...

2019-01-09 Thread onlyMIT
Github user onlyMIT closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2485


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246416127
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receiveNoWait();
--- End diff --

I believe the test clients receiveNoWait only polls its local queue, so 
this might now likely sporadically fail due to racing the deliveries. I was 
only suggesting recieveNoWait as potential initial verification within the loop 
for non-receiving consumers, to be followed up or substituted by a more 
stringent final check.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246398611
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

Does this comment cover to the bit around the safety of always sending the 
new additional data even to old servers? I can't tell if its covered.

I think it should at the very least be commented what/when the 
encoding+decoding handling behaviour changed so folks can understand the 
implications later without heading to find past commits.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246403272
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

I don't personally think this is a case which warrants keeping poor 
behaviour 'for consistency' when there are various essentially equivalent 
checks/assertions the test could do which don't require wasting 2 seconds. 
Having maybe run it once to ensure it worked, I'd change it.

The ActiveMQ 5 test suite is an even better example of a test suite so slow 
(due to things like this) that folks don't actually want to run it.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246383235
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -113,6 +133,7 @@ public int hashCode() {
   result = prime * result + (browseOnly ? 1231 : 1237);
   result = prime * result + ((filterString == null) ? 0 : 
filterString.hashCode());
   result = prime * result + (int) (id ^ (id >>> 32));
+  result = prime * result + priority;
--- End diff --

I would uses `Integer::hashCode(priority)` that would `agitate` the value 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246382311
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246337126
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246338675
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377289
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2409,14 +2410,10 @@ private void deliver() {
   // Either the iterator is empty or the consumer is busy
   int noDelivery = 0;
 
-  int size = 0;
-
-  int endPos = -1;
-
   int handled = 0;
 
   long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
-
+  consumers.reset();
--- End diff --

if any exception would be thrown before it is ok that `reset` won't be 
called? 
If not, better to wrap the whole logic with ìtry..finally` and `reset` 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246374340
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
--- End diff --

I will just save `changedIterator` once and will perform a logic with a 
local value instead of volatile load twice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246313269
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
--- End diff --

`private` given that seems that isn't needed by a child


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323500
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
+
+   public MultiIterator(Iterator[] iterators) {
--- End diff --

`I[]`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246375651
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

What happen if the cas will fail? we ends with a reset() that is not 
nulling `changedIterator`.
Given that we are just clearing the `changedIterator` to `null` a 
`lazySet(this, null) is enough.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323998
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators 
are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to 
another iterator it resets it.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiResettableIterator extends MultiIterator 
implements ResettableIterator {
+
+   public MultiResettableIterator(ResettableIterator[] iterators) {
+  super(iterators);
+   }
+
+   @Override
+   protected void moveTo(int index) {
+  super.moveTo(index);
+  if (index > -1) {
+ ((ResettableIterator) get(index)).reset();
--- End diff --

this cast could be avoided thanks to the changes on generics on 
`MultiIterator`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376376
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
+  }
+  return result;
+   }
+
+   @Override
+   public boolean remove(T t) {
+  boolean result = consumers.remove(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

`lazeSet` is enough for single-writer/single-threaded semantic 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323602
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
+
+   public MultiIterator(Iterator[] iterators) {
+  this.iterators = iterators;
+   }
+
+   @Override
+   public boolean hasNext() {
+  while (true) {
+ if (index != -1) {
+Iterator currentIterator = get(index);
+if (currentIterator.hasNext()) {
+   return true;
+}
+ }
+ int next = index + 1;
+ if (next < iterators.length) {
+moveTo(next);
+ } else {
+return false;
+ }
+  }
+   }
+
+   @Override
+   public T next() {
+  while (true) {
+ if (index != -1) {
+Iterator currentIterator = get(index);
+if (currentIterator.hasNext()) {
+   return currentIterator.next();
+}
+ }
+ int next = index + 1;
+ if (next < iterators.length) {
+moveTo(next);
+ } else {
+return null;
+ }
+  }
+   }
+
+   protected void moveTo(int index) {
+  this.index = index;
+   }
+
+   protected Iterator get(int index) {
--- End diff --

```java
protected I get(int index) 
```


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246322551
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
--- End diff --

`I[]`: comments above


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246344280
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
   this.filterString = filterString;
}
 
+   public void setPriority(byte priority) {
--- End diff --

this modifier is needed? I can't see it to be called by anyone: if not 
`priority` filed could be declared as `final`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377787
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2484,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
-
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

if `removeMessageReference` would throw any exeption is fine to have 
`consumers.reset` not called?
if not, uses `try...finally`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246321398
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
--- End diff --

`MultiIterator, T> implements Iterator`
It should avoid casting  on children while manipulating `iterators` type


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376356
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

`lazeSet` is enough for single-writer/single-threaded semantic 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323811
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators 
are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to 
another iterator it resets it.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiResettableIterator extends MultiIterator 
implements ResettableIterator {
--- End diff --

```java
public class MultiResettableIterator extends 
MultiIterator, T> implements ResettableIterator {
```


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376178
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

I suppose that `reset` is safe to be called just by one thread at time, if 
not, it would be complex because `currentIterator` could be changed 
indipendently


---


[GitHub] activemq-artemis pull request #1793: ARTEMIS-1498: Openwire internal headers...

2019-01-09 Thread RaiSaurabh
Github user RaiSaurabh closed the pull request at:

https://github.com/apache/activemq-artemis/pull/1793


---


[GitHub] activemq-artemis pull request #2494: ARTEMIS-2224 Reduce contention on LiveP...

2019-01-09 Thread franz1981
GitHub user franz1981 opened a pull request:

https://github.com/apache/activemq-artemis/pull/2494

ARTEMIS-2224 Reduce contention on LivePageCacheImpl

It includes:

- **lock-free LivePageCache + tests**:
LivePageCacheImpl has been reimplemented to be
lock-free, multi-producer and multi-consumer
in any of its operations.
- **Avoid unnecessary page cache queries on ack TX**:
PageSubscriptionImpl::ackTx is already performing a counter update
using the message persistent size: the size can be reused on
PagePosition::setPersistentSize, avoiding to query the page cache just
to compute it.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/franz1981/activemq-artemis lock-free-paging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2494.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2494






---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246247094
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

Look good to me!


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140306
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
+ assertNotNull("did not receive message first time", message1);
+ assertEquals("MessageID:" + i, message1.getMessageId());
+ message1.accept();
+ assertNull("message is not meant to goto lower priority 
receiver", message2);
+ assertNull("message is not meant to goto lower priority 
receiver", message3);
+  }
+
+  //Close the high priority receiver
+  receiver1.close();
+
+  sendMessages(getQueueName(), 5);
+
+  //Check messages now goto next priority receiver
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait()


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140214
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

changed up the ordering in this test also.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140041
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135818
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

now using getName ... again nice nice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135697
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Have changed if you can recheck to make sure i understood.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135542
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

done


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246133832
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

Nice i didnt know about that in the parent class. will change to use this..


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246132135
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

this is the original test from ActiveMQ5 i was trying to keep this test as 
much un-touched as possible to ensure behavior is the same.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128551
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

This is actually tested on the queueconsumerimpl test. But agree we can do 
same here


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128143
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

This is typical pattern used for adding safely a new field that can be 
either nullable or defaultable. Used many times over.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246127864
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

Makes sense


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246123082
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

I pushed an update to address the issue you identified. Thanks!


---


[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...

2019-01-08 Thread onlyMIT
Github user onlyMIT closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2493


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246095670
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

@jbertram in the getSessionState method.Only clear state,not call 
'clean()' method. In fact, the queue is not cleaned up.
I use the code for the ‘paho’ test. The first consumer 
"cleanSession=false", using a different clientID to open a producer to send a 
message. Close the producer and consumer, use the same clientID and 
cleanSession = true" to open the second consumer and find that the consumer 
will consume the legacy message in the queue。So I suspect that there is a 
problem with the test code.
I am always looking for why my test results will consume the legacy 
messages in the queue, and your test results will not。
After seeing your information, I re-reviewed the code and found that the 
test code did not have any problems. What is causing my doubts is that because 
of your change, when cleanSession=true, only the MQTTSessionState is cleaned 
up, the queue still exists, and the legacy messages in the queue are consumed 
when resubscribing.
 Can close [#2493 ](https://github.com/apache/activemq-artemis/pull/2493) . 
I think you need to review your change。 


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246075935
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

The expectation of a "setter" method is simply to _set_ a variable and 
nothing more. The additional logic in the `setClean` method is not intuitive 
which is why I removed it and put it in the `getSessionState` method.


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-08 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r246058995
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

I'm working on it. I'm out today on a meeting... will be done tomorrow (Wed)


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245974624
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

I'd suggest creating consumers with priorities out of order (e.g highest in 
middle), so they arent simply registered in sequence, as otherwise a simple 
failure to round-robin delivery attempts (given every receiver has enough 
credit to receive all messages) might also lead to the expected result even 
without any priority handling consideration.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973668
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

Burning 250ms twice per loop seems excessive. There is a receiveNoWait that 
could be used for initial verification nothing arrived, and/or a small final 
timed wait could be done outside the loop afterwards. Alternatively, 
pullImmediate() would avoid unnecessary waiting.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245968414
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

Rather than hard coding a shared name, using the test name for the queue 
name is nice as it isolates different tests and makes the relationship clear, 
sometimes makes it easier to work on issues later with particular tests. There 
is a test name rule in the parent class, and a getName() method that can be 
used with it.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245955337
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

I assume this is to allow for old clients that don't send this value. Would 
a more specific version check be clearer here for later reference? Related, I'm 
guessing other changes already made for 2.7.0 have updated the version info 
since it doesn't look to change here?

Also, is the reverse case safe, does an older server failing to read the 
additional value (seemingly always sent now) have potential to lead to any 
issues on older servers, i.e how might the buffer continue to be used later if 
at all? Should the client omit the value for older servers? (Or does the 
presumed version change prevent the new client working with the old server 
anyway? I don't know how that stuff is handled, just commenting from reading 
the diff here).


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973707
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
+ assertNotNull("did not receive message first time", message1);
+ assertEquals("MessageID:" + i, message1.getMessageId());
+ message1.accept();
+ assertNull("message is not meant to goto lower priority 
receiver", message2);
+ assertNull("message is not meant to goto lower priority 
receiver", message3);
+  }
+
+  //Close the high priority receiver
+  receiver1.close();
+
+  sendMessages(getQueueName(), 5);
+
+  //Check messages now goto next priority receiver
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

As above.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245953999
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

Nitpicking, the other details seem to be emitted 'in order' relative to the 
buffer content, so would it make sense to put this at the end consistent with 
its location?


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245965929
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Comments on the original #2488 PR suggest you want to align with Qpid 
Broker-J in this area. Its support (and the accompanying documentation lift) 
notes as an integral value, so the value here is not necessarily going to be 
the Integer type.


---


  1   2   3   4   5   6   7   8   9   10   >