[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246574781
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246558298
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

currentIterator is ONLY updated by reset, which should ONLY be called by 
the same threads operating on the ResettableIterator interface.

When add or remove of consumer occurs a new iterator is parked into the 
volatile changedIterator (using an atomic field updater), so the next reset can 
pick it up. 



---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246557671
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246555304
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

see other comment


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246551345
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

Tomorrow I will give another look but it doesn't seems correct to me if 
this code can be run by different threads: currentIterator is not guarded by 
any atomic operations to be safely published and changedIterator is volatile 
load 3 times before hitting the case...


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246525755
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

The compare and set is because the new iterator could be created whilst we 
are mid resetting. The iterator is single thread usage only. Updating the 
collection should be thread safe and able to occur in parralell.

This is much like iterating copyonwriteareatlist. Iterator works on a 
snapshot view and this is single thread. Updating the collection itself can be 
concurrent and occur whilst someone else is iterating 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246523218
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
--- End diff --

Agreed


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246521520
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
--- End diff --

Yeah i had a version like that but it makes using simple MultiIterator ugly 
to use. Uneededly. And to extend if needed only needs a cast. In case of 
resetable.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520970
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators 
are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to 
another iterator it resets it.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiResettableIterator extends MultiIterator 
implements ResettableIterator {
--- End diff --

You could but this then makes using MutiIterator ugly as hell with little 
gain


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520411
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520138
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520110
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

So it is a good reason to save using a `compareAndSet` that's not only 
expensive but give the false illusion that the code is thread-safe


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246520121
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518747
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
--- End diff --

Nice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518669
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
   this.filterString = filterString;
}
 
+   public void setPriority(byte priority) {
--- End diff --

Keeping inline with the rest of the fields and general approach. Id rather 
keep to status quo


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246518313
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

If it fails it means whilst we were reseting and changing reference to the 
new iterator. a consumer was added or removed and another new iterator was 
created (as this can occur whilsy we are iterating or resetting.) And means 
simply whilst this reset updated its iterator, on next reset it needs to switch 
over its again. We wouldnt want to lazy set to null 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516870
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

Like any iterator and iterator should only be interacted by one thread at a 
time.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246516479
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
+  }
+  return result;
+   }
+
+   @Override
+   public boolean remove(T t) {
+  boolean result = consumers.remove(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

Agreed. But this is not hot path, and if anything would allow in future for 
consumers to be concurrently added or removed without the existing bit sync 
blocks there is today in queueimpl


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515810
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2409,14 +2410,10 @@ private void deliver() {
   // Either the iterator is empty or the consumer is busy
   int noDelivery = 0;
 
-  int size = 0;
-
-  int endPos = -1;
-
   int handled = 0;
 
   long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
-
+  consumers.reset();
--- End diff --

Its fine. We actually only want to reset on a succesful handled nomatch or 
expired. Or before starting iterating


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246515455
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2484,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
-
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

Yes its fine. Would have no negative effect, and actually have same 
behaviour as old. 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246513536
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246416127
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receiveNoWait();
--- End diff --

I believe the test clients receiveNoWait only polls its local queue, so 
this might now likely sporadically fail due to racing the deliveries. I was 
only suggesting recieveNoWait as potential initial verification within the loop 
for non-receiving consumers, to be followed up or substituted by a more 
stringent final check.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246398611
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

Does this comment cover to the bit around the safety of always sending the 
new additional data even to old servers? I can't tell if its covered.

I think it should at the very least be commented what/when the 
encoding+decoding handling behaviour changed so folks can understand the 
implications later without heading to find past commits.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246403272
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

I don't personally think this is a case which warrants keeping poor 
behaviour 'for consistency' when there are various essentially equivalent 
checks/assertions the test could do which don't require wasting 2 seconds. 
Having maybe run it once to ensure it worked, I'd change it.

The ActiveMQ 5 test suite is an even better example of a test suite so slow 
(due to things like this) that folks don't actually want to run it.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246383235
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -113,6 +133,7 @@ public int hashCode() {
   result = prime * result + (browseOnly ? 1231 : 1237);
   result = prime * result + ((filterString == null) ? 0 : 
filterString.hashCode());
   result = prime * result + (int) (id ^ (id >>> 32));
+  result = prime * result + priority;
--- End diff --

I would uses `Integer::hashCode(priority)` that would `agitate` the value 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246382311
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246337126
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246338675
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityCollection.java
 ---
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+
+import java.lang.reflect.Array;
+import java.util.AbstractCollection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * This class's purpose is to hold the the different collections used for 
each priority level.
+ *
+ * A supplier is required to provide the underlying collection needed when 
a new priority level is seen,
+ * and the end behaviour is that of the underlying collection, e.g. if set 
add will follow set's add semantics,
+ * if list, then list semantics.
+ *
+ * Methods getArray, setArray MUST never be exposed, and all array 
modifications must go through these.
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware.
+ */
+public class PriorityCollection extends 
AbstractCollection {
+
+   private final Supplier> supplier;
+   private volatile PriorityHolder[] priorityHolders = 
newPrioritySetArrayInstance(0);
+   private volatile int size;
+
+   private void setArray(PriorityHolder[] priorityHolders) {
+  this.priorityHolders = priorityHolders;
+   }
+
+   private PriorityHolder[] getArray() {
+  return priorityHolders;
+   }
+
+
+   public PriorityCollection(Supplier> supplier) {
+  this.supplier = supplier;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  PriorityHolder[] newPrioritySetArrayInstance(int 
length) {
+  return (PriorityHolder[]) Array.newInstance(PriorityHolder.class, 
length);
+   }
+
+   @Override
+   public int size() {
+  return size;
+   }
+
+   @Override
+   public boolean isEmpty() {
+  return size() == 0;
+   }
+
+   public Set getPriorites() {
+  PriorityHolder[] snapshot = getArray();
+  return 
Arrays.stream(snapshot).map(PriorityAware::getPriority).collect(Collectors.toSet());
+   }
+
+   @Override
+   public Iterator iterator() {
+  Iterator[] iterators = getIterators();
+  return new MultiIterator<>(iterators);
+   }
+
+   private Iterator[] getIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  Iterator[] iterators = newIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = snapshot[i].getValues().iterator();
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  Iterator[] newIteratorArrayInstance(int length) {
+  return (Iterator[]) Array.newInstance(Iterator.class, length);
+   }
+
+   public ResettableIterator resettableIterator() {
+  return new MultiResettableIterator(getResettableIterators());
+   }
+
+   private ResettableIterator[] getResettableIterators() {
+  PriorityHolder[] snapshot = this.getArray();
+  int size = snapshot.length;
+  ResettableIterator[] iterators = 
newResettableIteratorArrayInstance(size);
+  for (int i = 0; i < size; i++) {
+ iterators[i] = new 
ArrayResettableIterator<>(snapshot[i].getValues().toArray());
+  }
+  return iterators;
+   }
+
+   @SuppressWarnings("unchecked")
+   private static  ResettableIterator[] 
newResettableIteratorArrayInstance(int length) {
+  return 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377289
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2409,14 +2410,10 @@ private void deliver() {
   // Either the iterator is empty or the consumer is busy
   int noDelivery = 0;
 
-  int size = 0;
-
-  int endPos = -1;
-
   int handled = 0;
 
   long timeout = System.currentTimeMillis() + DELIVERY_TIMEOUT;
-
+  consumers.reset();
--- End diff --

if any exception would be thrown before it is ok that `reset` won't be 
called? 
If not, better to wrap the whole logic with ìtry..finally` and `reset` 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246374340
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
--- End diff --

I will just save `changedIterator` once and will perform a logic with a 
local value instead of volatile load twice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246313269
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
--- End diff --

`private` given that seems that isn't needed by a child


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323500
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
+
+   public MultiIterator(Iterator[] iterators) {
--- End diff --

`I[]`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246375651
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
--- End diff --

What happen if the cas will fail? we ends with a reset() that is not 
nulling `changedIterator`.
Given that we are just clearing the `changedIterator` to `null` a 
`lazySet(this, null) is enough.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323998
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators 
are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to 
another iterator it resets it.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiResettableIterator extends MultiIterator 
implements ResettableIterator {
+
+   public MultiResettableIterator(ResettableIterator[] iterators) {
+  super(iterators);
+   }
+
+   @Override
+   protected void moveTo(int index) {
+  super.moveTo(index);
+  if (index > -1) {
+ ((ResettableIterator) get(index)).reset();
--- End diff --

this cast could be avoided thanks to the changes on generics on 
`MultiIterator`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376376
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
+  }
+  return result;
+   }
+
+   @Override
+   public boolean remove(T t) {
+  boolean result = consumers.remove(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

`lazeSet` is enough for single-writer/single-threaded semantic 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323602
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
+   int index = -1;
+
+   public MultiIterator(Iterator[] iterators) {
+  this.iterators = iterators;
+   }
+
+   @Override
+   public boolean hasNext() {
+  while (true) {
+ if (index != -1) {
+Iterator currentIterator = get(index);
+if (currentIterator.hasNext()) {
+   return true;
+}
+ }
+ int next = index + 1;
+ if (next < iterators.length) {
+moveTo(next);
+ } else {
+return false;
+ }
+  }
+   }
+
+   @Override
+   public T next() {
+  while (true) {
+ if (index != -1) {
+Iterator currentIterator = get(index);
+if (currentIterator.hasNext()) {
+   return currentIterator.next();
+}
+ }
+ int next = index + 1;
+ if (next < iterators.length) {
+moveTo(next);
+ } else {
+return null;
+ }
+  }
+   }
+
+   protected void moveTo(int index) {
+  this.index = index;
+   }
+
+   protected Iterator get(int index) {
--- End diff --

```java
protected I get(int index) 
```


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246322551
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
+
+   private final Iterator[] iterators;
--- End diff --

`I[]`: comments above


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246344280
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -84,6 +94,10 @@ public void setFilterString(SimpleString filterString) {
   this.filterString = filterString;
}
 
+   public void setPriority(byte priority) {
--- End diff --

this modifier is needed? I can't see it to be called by anyone: if not 
`priority` filed could be declared as `final`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246377787
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 ---
@@ -2497,45 +2484,36 @@ private void deliver() {
 
 
   handled++;
-
+  consumers.reset();
   continue;
}
 
if (logger.isTraceEnabled()) {
   logger.trace("Queue " + this.getName() + " is delivering 
reference " + ref);
}
 
-   // If a group id is set, then this overrides the consumer 
chosen round-robin
-
-   SimpleString groupID = extractGroupID(ref);
-
-   if (groupID != null) {
-  groupConsumer = groups.get(groupID);
+   final SimpleString groupID = extractGroupID(ref);
+   groupConsumer = getGroupConsumer(groupConsumer, groupID);
 
-  if (groupConsumer != null) {
- consumer = groupConsumer;
-  }
-   }
-
-   if (exclusive && redistributor == null) {
-  consumer = consumerList.get(0).consumer;
+   if (groupConsumer != null) {
+  consumer = groupConsumer;
}
 
HandleStatus status = handle(ref, consumer);
 
if (status == HandleStatus.HANDLED) {
 
-  deliveriesInTransit.countUp();
-
-  handledconsumer = consumer;
-
-  removeMessageReference(holder, ref);
-
   if (redistributor == null) {
  handleMessageGroup(ref, consumer, groupConsumer, 
groupID);
   }
 
+  deliveriesInTransit.countUp();
+
+
+  removeMessageReference(holder, ref);
+  handledconsumer = consumer;
   handled++;
+  consumers.reset();
--- End diff --

if `removeMessageReference` would throw any exeption is fine to have 
`consumers.reset` not called?
if not, uses `try...finally`


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246321398
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiIterator.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Iterator;
+
+/**
+ * Provides an Iterator that works over multiple underlying iterators.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiIterator implements Iterator {
--- End diff --

`MultiIterator, T> implements Iterator`
It should avoid casting  on children while manipulating `iterators` type


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376356
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
+  }
+  return this;
+   }
+
+   @Override
+   public boolean add(T t) {
+  boolean result = consumers.add(t);
+  if (result) {
+ changedIteratorFieldUpdater.set(this, 
consumers.resettableIterator());
--- End diff --

`lazeSet` is enough for single-writer/single-threaded semantic 


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246323811
  
--- Diff: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/MultiResettableIterator.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+/**
+ * Extends MultiIterator, adding the ability if the underlying iterators 
are resettable, then its self can reset.
+ * It achieves this by going back to the first iterator, and as moves to 
another iterator it resets it.
+ *
+ * @param  type of the class of the iterator.
+ */
+public class MultiResettableIterator extends MultiIterator 
implements ResettableIterator {
--- End diff --

```java
public class MultiResettableIterator extends 
MultiIterator, T> implements ResettableIterator {
```


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-09 Thread franz1981
Github user franz1981 commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246376178
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.core.PriorityAware;
+import org.apache.activemq.artemis.utils.collections.PriorityCollection;
+import org.apache.activemq.artemis.utils.collections.ResettableIterator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.function.Consumer;
+
+/**
+ * This class's purpose is to hold the consumers.
+ *
+ * CopyOnWriteArraySet is used as the underlying collection to the 
PriorityCollection, as it is concurrent safe,
+ * but also lock less for a read path, which is our HOT path.
+ * Also it was the underlying collection previously used in QueueImpl, 
before we abstracted it out to support priority consumers.
+ *
+ * There can only be one resettable iterable view,
+ * A new iterable view is created on modification, this is to keep the 
read HOT path performent, BUT
+ * the iterable view changes only after reset so changes in the underlying 
collection are only seen after a reset,
+ *
+ * All other iterators created by iterators() method are not reset-able 
and are created on delegating iterator().
+ *
+ * @param  The type this class may hold, this is generic as can be 
anything that extends PriorityAware,
+ * but intent is this is the QueueImpl:ConsumerHolder.
+ */
+public class QueueConsumersImpl implements 
QueueConsumers {
+
+   private final PriorityCollection consumers = new 
PriorityCollection<>(CopyOnWriteArraySet::new);
+   private final Collection unmodifiableConsumers = 
Collections.unmodifiableCollection(consumers);
+   private final AtomicReferenceFieldUpdater changedIteratorFieldUpdater =  
AtomicReferenceFieldUpdater.newUpdater(QueueConsumersImpl.class, 
ResettableIterator.class, "changedIterator");
+   private volatile ResettableIterator changedIterator;
+   private ResettableIterator currentIterator = 
consumers.resettableIterator();
+
+   @Override
+   public Set getPriorites() {
+  return consumers.getPriorites();
+   }
+
+   @Override
+   public boolean hasNext() {
+  return currentIterator.hasNext();
+   }
+
+   @Override
+   public T next() {
+  return currentIterator.next();
+   }
+
+   @Override
+   public QueueConsumers reset() {
+  if (changedIterator != null) {
+ currentIterator = changedIterator;
+ changedIteratorFieldUpdater.compareAndSet(this, changedIterator, 
null);
+  } else {
+ currentIterator.reset();
--- End diff --

I suppose that `reset` is safe to be called just by one thread at time, if 
not, it would be complex because `currentIterator` could be changed 
indipendently


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140306
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
+ assertNotNull("did not receive message first time", message1);
+ assertEquals("MessageID:" + i, message1.getMessageId());
+ message1.accept();
+ assertNull("message is not meant to goto lower priority 
receiver", message2);
+ assertNull("message is not meant to goto lower priority 
receiver", message3);
+  }
+
+  //Close the high priority receiver
+  receiver1.close();
+
+  sendMessages(getQueueName(), 5);
+
+  //Check messages now goto next priority receiver
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait()


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140214
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

changed up the ordering in this test also.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140041
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135818
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

now using getName ... again nice nice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135697
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Have changed if you can recheck to make sure i understood.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135542
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

done


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246133832
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

Nice i didnt know about that in the parent class. will change to use this..


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246132135
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

this is the original test from ActiveMQ5 i was trying to keep this test as 
much un-touched as possible to ensure behavior is the same.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128551
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

This is actually tested on the queueconsumerimpl test. But agree we can do 
same here


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128143
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

This is typical pattern used for adding safely a new field that can be 
either nullable or defaultable. Used many times over.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246127864
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

Makes sense


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245974624
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

I'd suggest creating consumers with priorities out of order (e.g highest in 
middle), so they arent simply registered in sequence, as otherwise a simple 
failure to round-robin delivery attempts (given every receiver has enough 
credit to receive all messages) might also lead to the expected result even 
without any priority handling consideration.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973668
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

Burning 250ms twice per loop seems excessive. There is a receiveNoWait that 
could be used for initial verification nothing arrived, and/or a small final 
timed wait could be done outside the loop afterwards. Alternatively, 
pullImmediate() would avoid unnecessary waiting.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245968414
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

Rather than hard coding a shared name, using the test name for the queue 
name is nice as it isolates different tests and makes the relationship clear, 
sometimes makes it easier to work on issues later with particular tests. There 
is a test name rule in the parent class, and a getName() method that can be 
used with it.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245955337
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

I assume this is to allow for old clients that don't send this value. Would 
a more specific version check be clearer here for later reference? Related, I'm 
guessing other changes already made for 2.7.0 have updated the version info 
since it doesn't look to change here?

Also, is the reverse case safe, does an older server failing to read the 
additional value (seemingly always sent now) have potential to lead to any 
issues on older servers, i.e how might the buffer continue to be used later if 
at all? Should the client omit the value for older servers? (Or does the 
presumed version change prevent the new client working with the old server 
anyway? I don't know how that stuff is handled, just commenting from reading 
the diff here).


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973707
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
+ assertNotNull("did not receive message first time", message1);
+ assertEquals("MessageID:" + i, message1.getMessageId());
+ message1.accept();
+ assertNull("message is not meant to goto lower priority 
receiver", message2);
+ assertNull("message is not meant to goto lower priority 
receiver", message3);
+  }
+
+  //Close the high priority receiver
+  receiver1.close();
+
+  sendMessages(getQueueName(), 5);
+
+  //Check messages now goto next priority receiver
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

As above.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245953999
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

Nitpicking, the other details seem to be emitted 'in order' relative to the 
buffer content, so would it make sense to put this at the end consistent with 
its location?


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245965929
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Comments on the original #2488 PR suggest you want to align with Qpid 
Broker-J in this area. Its support (and the accompanying documentation lift) 
notes as an integral value, so the value here is not necessarily going to be 
the Integer type.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245972322
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

Would a receiveNoWait (either in or outside the loop) like the other tests 
be nicer than burning 2 seconds? Slow tests is a key reason eventually noone 
wants to runs the tests :)


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-07 Thread michaelandrepearce
GitHub user michaelandrepearce opened a pull request:

https://github.com/apache/activemq-artemis/pull/2490

V2 196

@franz1981 an alternative so we don't have to have a copy of 
CopyOnWriteArrayList, it does mean on add or remove consumer we have to invoke 
toArray which causes a copy, but this is not on hot path, so i think we should 
be good, and avoids us having to clone a jvm class.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michaelandrepearce/activemq-artemis V2-196

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2490


commit d731ffe7288cb857fef1b97deff4b7dc18aeb6d7
Author: Michael André Pearce 
Date:   2018-12-31T13:22:02Z

ARTEMIS-196 Implement Consumer Priority

Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical 
class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs

commit b0c775840fc98b5d3f5f3485802de3270c614d9a
Author: Michael André Pearce 
Date:   2019-01-05T09:48:24Z

Extract




---