[GitHub] activemq-artemis issue #2492: ARTEMIS-2222 why the position remains unchange...

2019-01-08 Thread CNNJYB
Github user CNNJYB commented on the issue:

https://github.com/apache/activemq-artemis/pull/2492
  
> This had a specific purpose so we ignore deleted or completed pages, so 
we shouldnt be getting the message position.
> 
> As per comment in code.
> 
> ```
> // any deleted or complete page will be ignored on the 
moveNextPage, we will just keep going
> ```
> Is there visible bug that needs addressing? We shouldnt just be changing 
code, without a specific testable bug we're trying to resolve.



> This had a specific purpose so we ignore deleted or completed pages, so 
we shouldnt be getting the message position.
> 
> As per comment in code.
> 
> ```
> // any deleted or complete page will be ignored on the 
moveNextPage, we will just keep going
> ```
> Is there visible bug that needs addressing? We shouldnt just be changing 
code, without a specific testable bug we're trying to resolve.

If some pages have not been deleted (tb consumes these messages, ta has not 
consumed yet), CursorIterator:moveNext is called during queue depage. These 
messages will be traversed again. Especially during 
CursorProviderget:getPageCache, it takes time to readPage, if pages are not in 
the softCache collection.


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246247094
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

Look good to me!


---


Re: Random Access Queues, possible?

2019-01-08 Thread Andreas Mueller
AMQP 1.0 with transactional retirement would partly support my requirements but 
I still need the keys as I don’t want to load the complete queue. Also, 
transactional retirement associates a message outcome (settled/rejected) with a 
tx. Since this is built on top of the Core API, there must be some key/outcome 
pair that identifies the message. 

Or am I wrong?

> Am 08.01.2019 um 21:23 schrieb Clebert Suconic :
> 
> Perhaps what Andreas is looking is different. Supporting his API
> (Streams) on top of the broker, and instead of using internal APIs
> using an established API?
> 
> The current contract we have on the broker is based on the messages
> where you can call message.individualAck or message.ack (The protocol
> layers are using this call).
> 
> I would think all you need is to find a way to locate a message with a
> given ID for plugin your API?
> 
>> On Tue, Jan 8, 2019 at 1:32 PM Arthur Naseef  wrote:
>> 
>> I agree.  Messaging and Database patterns are very different, with
>> different optimizations and considerations.
>> 
>> That's why folks often hear me repeat a part of a Jeff Genender's
>> presentation  - "don't use ActiveMQ as a message store".
>> 
>> Messaging is about moving messages as quickly as possible between
>> endpoints.  Databases, on the other hand, are oriented to solve "source of
>> truth" problems.  One example of where this becomes clear - ActiveMQ has
>> almost no means to randomly access messages, and those means that exist are
>> not good for production - they are only useful for testing and maybe
>> diagnostic purposes.
>> 
>> While it could be desirable from an application perspective to simplify the
>> application, having messages stored for long periods of time in messaging
>> middleware, that's not how ActiveMQ (or other messaging middleware) are
>> oriented.
>> 
>> With all of that said, I am curious to know what motivations exist to drive
>> this request.
>> 
>> Hope this helps.
>> 
>> Art
>> 
>> 
>> 
>> 
>> On Tue, Jan 8, 2019 at 10:23 AM Christopher Shannon <
>> christopher.l.shan...@gmail.com> wrote:
>> 
>>> Random access queues don't make sense for a message broker and is not
>>> supported.  Based on your use case Artemis or any message broker does not
>>> sound like the correct product.
>>> 
>>> It sounds like you need something like one of the many key/value stores
>>> that exist. (you can search around and see what's out there)
>>> 
 On Tue, Jan 8, 2019 at 7:17 AM Andreas Mueller  wrote:
 
 Hi,
 
 we have a sub project that currently runs within SwiftMQ as a plugin and
 uses SwiftMQ’s Swiftlet API to communicate with the internal components.
 I’m currently evaluating to port it to Artemis where it should run as a
 broker plugin. If that is possible with reasonable effort, we intend to
 make this sub project available as open source.
 
 This library uses queues as kind of database. We do not want to use a
>>> real
 database such as JDBC for it because we want it completely broker centric
 without dependencies and we want it transactional consistent, e.g. when a
 HA broker fails over, the data should be transactional save at the new HA
 instance.
 
 To accomplish this, we need random access to queues as specified in this
 little interface:
 
 public interface RandomAccessQueue {
/**
 * Returns keys of all messages in this queue.
 * @return List of keys
 */
List getKeys();
 
/**
 * Returns a message by its key. The message is not locked.
 * @param key
 * @return Message
 */
Message getMessageByKey(Object key);
 
/**
 * Locks all messages for removal by their key
 * @param txid Transaction id
 * @param keys List of keys
 */
void lockForRemoval(Object txid, List keys);
 
/**
 * Commits a transaction. Removes all messages that are locked in
>>> this
 transaction id.
 * @param txid Transaction id
 */
void commit(Object txid);
 
/**
 * Aborts a transaction. All messages locked for this transaction are
 simply unlocked.
 * @param txid Transaction id
 */
void abort(Object txid);
 }
 
 I’ve walked through the Artemis docs but did not find a way to do this.
 
 Can anyone tell me if that is possible? If yes, what are the implications
 in terms of performance if I get a message from an arbitrary position of
 the queue and remove it? I want to avoid a full scan of the transaction
 log, for example.
 
 Thanks!
 
 Regards,
 Andreas
 
 --
 Andreas Mueller
 IIT Software GmbH
 http://www.swiftmq.com
 
 
 
 
 
 IIT Software GmbH
 Falkenhorst 11, 48155 Münster, Germany
 Phone: +49 (0)251 39 72 99 00
 Managing Director: Andreas Müller
 District Court: 

Re: Random Access Queues, possible?

2019-01-08 Thread Andreas Mueller
Hi,

JMS with selectors and tx as well as AMQP with tx association of individual 
messages is supported so there must already be some kind of random queue 
access. May be it is just not exposed? 

Regards 
Andreas

> Am 08.01.2019 um 18:22 schrieb Christopher Shannon 
> :
> 
> Random access queues don't make sense for a message broker and is not
> supported.  Based on your use case Artemis or any message broker does not
> sound like the correct product.
> 
> It sounds like you need something like one of the many key/value stores
> that exist. (you can search around and see what's out there)
> 
>> On Tue, Jan 8, 2019 at 7:17 AM Andreas Mueller  wrote:
>> 
>> Hi,
>> 
>> we have a sub project that currently runs within SwiftMQ as a plugin and
>> uses SwiftMQ’s Swiftlet API to communicate with the internal components.
>> I’m currently evaluating to port it to Artemis where it should run as a
>> broker plugin. If that is possible with reasonable effort, we intend to
>> make this sub project available as open source.
>> 
>> This library uses queues as kind of database. We do not want to use a real
>> database such as JDBC for it because we want it completely broker centric
>> without dependencies and we want it transactional consistent, e.g. when a
>> HA broker fails over, the data should be transactional save at the new HA
>> instance.
>> 
>> To accomplish this, we need random access to queues as specified in this
>> little interface:
>> 
>> public interface RandomAccessQueue {
>>/**
>> * Returns keys of all messages in this queue.
>> * @return List of keys
>> */
>>List getKeys();
>> 
>>/**
>> * Returns a message by its key. The message is not locked.
>> * @param key
>> * @return Message
>> */
>>Message getMessageByKey(Object key);
>> 
>>/**
>> * Locks all messages for removal by their key
>> * @param txid Transaction id
>> * @param keys List of keys
>> */
>>void lockForRemoval(Object txid, List keys);
>> 
>>/**
>> * Commits a transaction. Removes all messages that are locked in this
>> transaction id.
>> * @param txid Transaction id
>> */
>>void commit(Object txid);
>> 
>>/**
>> * Aborts a transaction. All messages locked for this transaction are
>> simply unlocked.
>> * @param txid Transaction id
>> */
>>void abort(Object txid);
>> }
>> 
>> I’ve walked through the Artemis docs but did not find a way to do this.
>> 
>> Can anyone tell me if that is possible? If yes, what are the implications
>> in terms of performance if I get a message from an arbitrary position of
>> the queue and remove it? I want to avoid a full scan of the transaction
>> log, for example.
>> 
>> Thanks!
>> 
>> Regards,
>> Andreas
>> 
>> --
>> Andreas Mueller
>> IIT Software GmbH
>> http://www.swiftmq.com
>> 
>> 
>> 
>> 
>> 
>> IIT Software GmbH
>> Falkenhorst 11, 48155 Münster, Germany
>> Phone: +49 (0)251 39 72 99 00
>> Managing Director: Andreas Müller
>> District Court: Amtsgericht Münster, HRB 16294
>> VAT-No: DE199945912
>> 
>> This e-mail may contain confidential and/or privileged information. If you
>> are not the intended recipient (or have received this e-mail in error)
>> please notify the sender immediately and destroy this e-mail. Any
>> unauthorized copying, disclosure or distribution of the material in this
>> e-mail is strictly forbidden.
>> 
>> 



IIT Software GmbH
Falkenhorst 11, 48155 Münster, Germany
Phone: +49 (0)251 39 72 99 00
Managing Director: Andreas Müller
District Court: Amtsgericht Münster, HRB 16294
VAT-No: DE199945912

This e-mail may contain confidential and/or privileged information. If you are 
not the intended recipient (or have received this e-mail in error) please 
notify the sender immediately and destroy this e-mail. Any unauthorized 
copying, disclosure or distribution of the material in this e-mail is strictly 
forbidden.



[GitHub] activemq-artemis issue #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2490
  
@gemmellr thanks for review, if you could recheck the AMQPSessionCallback 
for me, to make sure i understood you. 

As for the Openwire test case, this was a simple port over of the existing 
activemq5 test case as untouched as possible, i agree we could reduce the time 
but id rather (at least for this release) keep it the same, so we can be sure 
feature works for openwire same as activemq5.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140306
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
+ assertNotNull("did not receive message first time", message1);
+ assertEquals("MessageID:" + i, message1.getMessageId());
+ message1.accept();
+ assertNull("message is not meant to goto lower priority 
receiver", message2);
+ assertNull("message is not meant to goto lower priority 
receiver", message3);
+  }
+
+  //Close the high priority receiver
+  receiver1.close();
+
+  sendMessages(getQueueName(), 5);
+
+  //Check messages now goto next priority receiver
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait()


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140214
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

changed up the ordering in this test also.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246140041
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

changed to receiveNoWait


---


Re: Random Access Queues, possible?

2019-01-08 Thread Clebert Suconic
Perhaps what Andreas is looking is different. Supporting his API
(Streams) on top of the broker, and instead of using internal APIs
using an established API?

The current contract we have on the broker is based on the messages
where you can call message.individualAck or message.ack (The protocol
layers are using this call).

I would think all you need is to find a way to locate a message with a
given ID for plugin your API?

On Tue, Jan 8, 2019 at 1:32 PM Arthur Naseef  wrote:
>
> I agree.  Messaging and Database patterns are very different, with
> different optimizations and considerations.
>
> That's why folks often hear me repeat a part of a Jeff Genender's
> presentation  - "don't use ActiveMQ as a message store".
>
> Messaging is about moving messages as quickly as possible between
> endpoints.  Databases, on the other hand, are oriented to solve "source of
> truth" problems.  One example of where this becomes clear - ActiveMQ has
> almost no means to randomly access messages, and those means that exist are
> not good for production - they are only useful for testing and maybe
> diagnostic purposes.
>
> While it could be desirable from an application perspective to simplify the
> application, having messages stored for long periods of time in messaging
> middleware, that's not how ActiveMQ (or other messaging middleware) are
> oriented.
>
> With all of that said, I am curious to know what motivations exist to drive
> this request.
>
> Hope this helps.
>
> Art
>
>
>
>
> On Tue, Jan 8, 2019 at 10:23 AM Christopher Shannon <
> christopher.l.shan...@gmail.com> wrote:
>
> > Random access queues don't make sense for a message broker and is not
> > supported.  Based on your use case Artemis or any message broker does not
> > sound like the correct product.
> >
> > It sounds like you need something like one of the many key/value stores
> > that exist. (you can search around and see what's out there)
> >
> > On Tue, Jan 8, 2019 at 7:17 AM Andreas Mueller  wrote:
> >
> > > Hi,
> > >
> > > we have a sub project that currently runs within SwiftMQ as a plugin and
> > > uses SwiftMQ’s Swiftlet API to communicate with the internal components.
> > > I’m currently evaluating to port it to Artemis where it should run as a
> > > broker plugin. If that is possible with reasonable effort, we intend to
> > > make this sub project available as open source.
> > >
> > > This library uses queues as kind of database. We do not want to use a
> > real
> > > database such as JDBC for it because we want it completely broker centric
> > > without dependencies and we want it transactional consistent, e.g. when a
> > > HA broker fails over, the data should be transactional save at the new HA
> > > instance.
> > >
> > > To accomplish this, we need random access to queues as specified in this
> > > little interface:
> > >
> > > public interface RandomAccessQueue {
> > > /**
> > >  * Returns keys of all messages in this queue.
> > >  * @return List of keys
> > >  */
> > > List getKeys();
> > >
> > > /**
> > >  * Returns a message by its key. The message is not locked.
> > >  * @param key
> > >  * @return Message
> > >  */
> > > Message getMessageByKey(Object key);
> > >
> > > /**
> > >  * Locks all messages for removal by their key
> > >  * @param txid Transaction id
> > >  * @param keys List of keys
> > >  */
> > > void lockForRemoval(Object txid, List keys);
> > >
> > > /**
> > >  * Commits a transaction. Removes all messages that are locked in
> > this
> > > transaction id.
> > >  * @param txid Transaction id
> > >  */
> > > void commit(Object txid);
> > >
> > > /**
> > >  * Aborts a transaction. All messages locked for this transaction are
> > > simply unlocked.
> > >  * @param txid Transaction id
> > >  */
> > > void abort(Object txid);
> > > }
> > >
> > > I’ve walked through the Artemis docs but did not find a way to do this.
> > >
> > > Can anyone tell me if that is possible? If yes, what are the implications
> > > in terms of performance if I get a message from an arbitrary position of
> > > the queue and remove it? I want to avoid a full scan of the transaction
> > > log, for example.
> > >
> > > Thanks!
> > >
> > > Regards,
> > > Andreas
> > >
> > > --
> > > Andreas Mueller
> > > IIT Software GmbH
> > > http://www.swiftmq.com
> > >
> > >
> > >
> > >
> > >
> > > IIT Software GmbH
> > > Falkenhorst 11, 48155 Münster, Germany
> > > Phone: +49 (0)251 39 72 99 00
> > > Managing Director: Andreas Müller
> > > District Court: Amtsgericht Münster, HRB 16294
> > > VAT-No: DE199945912
> > >
> > > This e-mail may contain confidential and/or privileged information. If
> > you
> > > are not the intended recipient (or have received this e-mail in error)
> > > please notify the sender immediately and destroy this e-mail. Any
> > > unauthorized copying, disclosure or distribution of the material in this
> > > e-mail is strictly 

[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135818
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

now using getName ... again nice nice


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135697
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Have changed if you can recheck to make sure i understood.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246135542
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

done


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246133832
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

Nice i didnt know about that in the parent class. will change to use this..


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246132135
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

this is the original test from ActiveMQ5 i was trying to keep this test as 
much un-touched as possible to ensure behavior is the same.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128551
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

This is actually tested on the queueconsumerimpl test. But agree we can do 
same here


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246128143
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

This is typical pattern used for adding safely a new field that can be 
either nullable or defaultable. Used many times over.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r246127864
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

Makes sense


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246123082
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

I pushed an update to address the issue you identified. Thanks!


---


Re: Random Access Queues, possible?

2019-01-08 Thread Arthur Naseef
I agree.  Messaging and Database patterns are very different, with
different optimizations and considerations.

That's why folks often hear me repeat a part of a Jeff Genender's
presentation  - "don't use ActiveMQ as a message store".

Messaging is about moving messages as quickly as possible between
endpoints.  Databases, on the other hand, are oriented to solve "source of
truth" problems.  One example of where this becomes clear - ActiveMQ has
almost no means to randomly access messages, and those means that exist are
not good for production - they are only useful for testing and maybe
diagnostic purposes.

While it could be desirable from an application perspective to simplify the
application, having messages stored for long periods of time in messaging
middleware, that's not how ActiveMQ (or other messaging middleware) are
oriented.

With all of that said, I am curious to know what motivations exist to drive
this request.

Hope this helps.

Art




On Tue, Jan 8, 2019 at 10:23 AM Christopher Shannon <
christopher.l.shan...@gmail.com> wrote:

> Random access queues don't make sense for a message broker and is not
> supported.  Based on your use case Artemis or any message broker does not
> sound like the correct product.
>
> It sounds like you need something like one of the many key/value stores
> that exist. (you can search around and see what's out there)
>
> On Tue, Jan 8, 2019 at 7:17 AM Andreas Mueller  wrote:
>
> > Hi,
> >
> > we have a sub project that currently runs within SwiftMQ as a plugin and
> > uses SwiftMQ’s Swiftlet API to communicate with the internal components.
> > I’m currently evaluating to port it to Artemis where it should run as a
> > broker plugin. If that is possible with reasonable effort, we intend to
> > make this sub project available as open source.
> >
> > This library uses queues as kind of database. We do not want to use a
> real
> > database such as JDBC for it because we want it completely broker centric
> > without dependencies and we want it transactional consistent, e.g. when a
> > HA broker fails over, the data should be transactional save at the new HA
> > instance.
> >
> > To accomplish this, we need random access to queues as specified in this
> > little interface:
> >
> > public interface RandomAccessQueue {
> > /**
> >  * Returns keys of all messages in this queue.
> >  * @return List of keys
> >  */
> > List getKeys();
> >
> > /**
> >  * Returns a message by its key. The message is not locked.
> >  * @param key
> >  * @return Message
> >  */
> > Message getMessageByKey(Object key);
> >
> > /**
> >  * Locks all messages for removal by their key
> >  * @param txid Transaction id
> >  * @param keys List of keys
> >  */
> > void lockForRemoval(Object txid, List keys);
> >
> > /**
> >  * Commits a transaction. Removes all messages that are locked in
> this
> > transaction id.
> >  * @param txid Transaction id
> >  */
> > void commit(Object txid);
> >
> > /**
> >  * Aborts a transaction. All messages locked for this transaction are
> > simply unlocked.
> >  * @param txid Transaction id
> >  */
> > void abort(Object txid);
> > }
> >
> > I’ve walked through the Artemis docs but did not find a way to do this.
> >
> > Can anyone tell me if that is possible? If yes, what are the implications
> > in terms of performance if I get a message from an arbitrary position of
> > the queue and remove it? I want to avoid a full scan of the transaction
> > log, for example.
> >
> > Thanks!
> >
> > Regards,
> > Andreas
> >
> > --
> > Andreas Mueller
> > IIT Software GmbH
> > http://www.swiftmq.com
> >
> >
> >
> >
> >
> > IIT Software GmbH
> > Falkenhorst 11, 48155 Münster, Germany
> > Phone: +49 (0)251 39 72 99 00
> > Managing Director: Andreas Müller
> > District Court: Amtsgericht Münster, HRB 16294
> > VAT-No: DE199945912
> >
> > This e-mail may contain confidential and/or privileged information. If
> you
> > are not the intended recipient (or have received this e-mail in error)
> > please notify the sender immediately and destroy this e-mail. Any
> > unauthorized copying, disclosure or distribution of the material in this
> > e-mail is strictly forbidden.
> >
> >
>


Re: Board Report Time - New Year Edition

2019-01-08 Thread Bruce Snyder
A big thank you to both Jean-Baptiste Onofré and Michael André Pearce for
contributing to this report! Thanks for taking the time to outline some of
the activities on the project guys!

I will submit the report today to the ASF board for next week's meeting.

Bruce

On Sun, Dec 30, 2018 at 7:00 PM Bruce Snyder  wrote:

> Please contribute to the ASF board report for January 2019. All it takes
> is five minutes to report project activity for any of the ActiveMQ modules
> on the wiki page below:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=99844486
>
> This report is due to by Jan 9, 2019.
>
> Happy New Year!
>
> Bruce
> --
> perl -e 'print
> unpack("u30","D0G)U8V4\@4VYY9&5R\"F)R=6-E+G-N>61E
> ActiveMQ in Action: http://bit.ly/2je6cQ
> Blog: http://bsnyder.org/ 
> Twitter: http://twitter.com/brucesnyder
>


-- 
perl -e 'print
unpack("u30","D0G)U8V4\@4VYY9&5R\"F)R=6-E+G-N>61Ehttp://bit.ly/2je6cQ
Blog: http://bsnyder.org/ 
Twitter: http://twitter.com/brucesnyder


[GitHub] activemq-artemis issue #2493: ARTEMIS-2223 when a new consumer is created, n...

2019-01-08 Thread onlyMIT
Github user onlyMIT commented on the issue:

https://github.com/apache/activemq-artemis/pull/2493
  
The test is correct.can close the jira and pull request


---


[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...

2019-01-08 Thread onlyMIT
Github user onlyMIT closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2493


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246095670
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

@jbertram in the getSessionState method.Only clear state,not call 
'clean()' method. In fact, the queue is not cleaned up.
I use the code for the ‘paho’ test. The first consumer 
"cleanSession=false", using a different clientID to open a producer to send a 
message. Close the producer and consumer, use the same clientID and 
cleanSession = true" to open the second consumer and find that the consumer 
will consume the legacy message in the queue。So I suspect that there is a 
problem with the test code.
I am always looking for why my test results will consume the legacy 
messages in the queue, and your test results will not。
After seeing your information, I re-reviewed the code and found that the 
test code did not have any problems. What is causing my doubts is that because 
of your change, when cleanSession=true, only the MQTTSessionState is cleaned 
up, the queue still exists, and the legacy messages in the queue are consumed 
when resubscribing.
 Can close [#2493 ](https://github.com/apache/activemq-artemis/pull/2493) . 
I think you need to review your change。 


---


Re: Random Access Queues, possible?

2019-01-08 Thread Christopher Shannon
Random access queues don't make sense for a message broker and is not
supported.  Based on your use case Artemis or any message broker does not
sound like the correct product.

It sounds like you need something like one of the many key/value stores
that exist. (you can search around and see what's out there)

On Tue, Jan 8, 2019 at 7:17 AM Andreas Mueller  wrote:

> Hi,
>
> we have a sub project that currently runs within SwiftMQ as a plugin and
> uses SwiftMQ’s Swiftlet API to communicate with the internal components.
> I’m currently evaluating to port it to Artemis where it should run as a
> broker plugin. If that is possible with reasonable effort, we intend to
> make this sub project available as open source.
>
> This library uses queues as kind of database. We do not want to use a real
> database such as JDBC for it because we want it completely broker centric
> without dependencies and we want it transactional consistent, e.g. when a
> HA broker fails over, the data should be transactional save at the new HA
> instance.
>
> To accomplish this, we need random access to queues as specified in this
> little interface:
>
> public interface RandomAccessQueue {
> /**
>  * Returns keys of all messages in this queue.
>  * @return List of keys
>  */
> List getKeys();
>
> /**
>  * Returns a message by its key. The message is not locked.
>  * @param key
>  * @return Message
>  */
> Message getMessageByKey(Object key);
>
> /**
>  * Locks all messages for removal by their key
>  * @param txid Transaction id
>  * @param keys List of keys
>  */
> void lockForRemoval(Object txid, List keys);
>
> /**
>  * Commits a transaction. Removes all messages that are locked in this
> transaction id.
>  * @param txid Transaction id
>  */
> void commit(Object txid);
>
> /**
>  * Aborts a transaction. All messages locked for this transaction are
> simply unlocked.
>  * @param txid Transaction id
>  */
> void abort(Object txid);
> }
>
> I’ve walked through the Artemis docs but did not find a way to do this.
>
> Can anyone tell me if that is possible? If yes, what are the implications
> in terms of performance if I get a message from an arbitrary position of
> the queue and remove it? I want to avoid a full scan of the transaction
> log, for example.
>
> Thanks!
>
> Regards,
> Andreas
>
> --
> Andreas Mueller
> IIT Software GmbH
> http://www.swiftmq.com
>
>
>
>
>
> IIT Software GmbH
> Falkenhorst 11, 48155 Münster, Germany
> Phone: +49 (0)251 39 72 99 00
> Managing Director: Andreas Müller
> District Court: Amtsgericht Münster, HRB 16294
> VAT-No: DE199945912
>
> This e-mail may contain confidential and/or privileged information. If you
> are not the intended recipient (or have received this e-mail in error)
> please notify the sender immediately and destroy this e-mail. Any
> unauthorized copying, disclosure or distribution of the material in this
> e-mail is strictly forbidden.
>
>


[GitHub] activemq-artemis issue #2493: ARTEMIS-2223 when a new consumer is created, n...

2019-01-08 Thread jbertram
Github user jbertram commented on the issue:

https://github.com/apache/activemq-artemis/pull/2493
  
The test is correct. Creating a subscription before attempting to receive 
the message defeats the point of the test as the test is ensuring that the 
session is clean (i.e. no previous subscriptions exist) when reconnecting with 
the same client ID.


---


[GitHub] activemq-artemis issue #2491: ARTEMIS-2217 remove state on clean MQTT sessio...

2019-01-08 Thread jbertram
Github user jbertram commented on the issue:

https://github.com/apache/activemq-artemis/pull/2491
  
The test code is correct. The state kept in an unclean session includes the 
subscriptions which is why the test doesn't create a subscription before trying 
to receive the message. If the broker doesn't properly clean the session state 
when the client connects with cleanSession=true then it will receive a message 
and the test will fail.


---


[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...

2019-01-08 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2491#discussion_r246075935
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 ---
@@ -117,14 +118,11 @@ boolean getStopped() {
}
 
boolean isClean() {
-  return isClean;
+  return clean;
}
 
-   void setIsClean(boolean isClean) throws Exception {
-  this.isClean = isClean;
-  if (isClean) {
- clean();
-  }
+   void setClean(boolean clean) throws Exception {
+  this.clean = clean;
--- End diff --

The expectation of a "setter" method is simply to _set_ a variable and 
nothing more. The additional logic in the `setClean` method is not intuitive 
which is why I removed it and put it in the `getSessionState` method.


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-08 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r246058995
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

I'm working on it. I'm out today on a meeting... will be done tomorrow (Wed)


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245974624
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
--- End diff --

I'd suggest creating consumers with priorities out of order (e.g highest in 
middle), so they arent simply registered in sequence, as otherwise a simple 
failure to round-robin delivery attempts (given every receiver has enough 
credit to receive all messages) might also lead to the expected result even 
without any priority handling consideration.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973668
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

Burning 250ms twice per loop seems excessive. There is a receiveNoWait that 
could be used for initial verification nothing arrived, and/or a small final 
timed wait could be done outside the loop afterwards. Alternatively, 
pullImmediate() would avoid unnecessary waiting.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245968414
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerPriorityTest.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.client;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.tests.util.JMSTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+/**
+ * Exclusive Test
+ */
+public class ConsumerPriorityTest extends JMSTestBase {
+
+   private SimpleString queueName = 
SimpleString.toSimpleString("jms.consumer.priority.queue");
--- End diff --

Rather than hard coding a shared name, using the test name for the queue 
name is nice as it isolates different tests and makes the relationship clear, 
sometimes makes it easier to work on issues later with particular tests. There 
is a test name rule in the parent class, and a getName() method that can be 
used with it.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245955337
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -104,6 +119,11 @@ public void decodeRest(final ActiveMQBuffer buffer) {
   filterString = buffer.readNullableSimpleString();
   browseOnly = buffer.readBoolean();
   requiresResponse = buffer.readBoolean();
+  if (buffer.readableBytes() > 0) {
--- End diff --

I assume this is to allow for old clients that don't send this value. Would 
a more specific version check be clearer here for later reference? Related, I'm 
guessing other changes already made for 2.7.0 have updated the version info 
since it doesn't look to change here?

Also, is the reverse case safe, does an older server failing to read the 
additional value (seemingly always sent now) have potential to lead to any 
issues on older servers, i.e how might the buffer continue to be used later if 
at all? Should the client omit the value for older servers? (Or does the 
presumed version change prevent the new client working with the old server 
anyway? I don't know how that stuff is handled, just commenting from reading 
the diff here).


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245973707
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test various behaviors of AMQP receivers with the broker.
+ */
+public class AmqpReceiverPriorityTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 3)
+   public void testPriority() throws Exception {
+
+  AmqpClient client = createAmqpClient();
+  AmqpConnection connection = addConnection(client.connect());
+  AmqpSession session = connection.createSession();
+
+  Map properties1 = new HashMap<>();
+  properties1.put(Symbol.getSymbol("priority"), 50);
+  AmqpReceiver receiver1 = session.createReceiver(getQueueName(), 
null, false, false, properties1);
+  receiver1.flow(100);
+
+  Map properties2 = new HashMap<>();
+  properties2.put(Symbol.getSymbol("priority"), 10);
+  AmqpReceiver receiver2 = session.createReceiver(getQueueName(), 
null, false, false, properties2);
+  receiver2.flow(100);
+
+  Map properties3 = new HashMap<>();
+  properties3.put(Symbol.getSymbol("priority"), 5);
+  AmqpReceiver receiver3 = session.createReceiver(getQueueName(), 
null, false, false, properties3);
+  receiver3.flow(100);
+
+  sendMessages(getQueueName(), 5);
+
+
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message1 = receiver1.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
+ assertNotNull("did not receive message first time", message1);
+ assertEquals("MessageID:" + i, message1.getMessageId());
+ message1.accept();
+ assertNull("message is not meant to goto lower priority 
receiver", message2);
+ assertNull("message is not meant to goto lower priority 
receiver", message3);
+  }
+
+  //Close the high priority receiver
+  receiver1.close();
+
+  sendMessages(getQueueName(), 5);
+
+  //Check messages now goto next priority receiver
+  for (int i = 0; i < 5; i++) {
+ AmqpMessage message2 = receiver2.receive(250, 
TimeUnit.MILLISECONDS);
+ AmqpMessage message3 = receiver3.receive(250, 
TimeUnit.MILLISECONDS);
--- End diff --

As above.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245953999
  
--- Diff: 
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java
 ---
@@ -52,6 +57,7 @@ public String toString() {
   StringBuffer buff = new StringBuffer(getParentString());
   buff.append(", queueName=" + queueName);
   buff.append(", filterString=" + filterString);
+  buff.append(", priority=" + priority);
--- End diff --

Nitpicking, the other details seem to be emitted 'in order' relative to the 
buffer content, so would it make sense to put this at the end consistent with 
its location?


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245965929
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 ---
@@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext 
protonSender,
   return consumer;
}
 
+   private int getPriority(Map properties) {
+  Integer value = properties == null ? null : (Integer) 
properties.get(PRIORITY);
--- End diff --

Comments on the original #2488 PR suggest you want to align with Qpid 
Broker-J in this area. Its support (and the accompanying documentation lift) 
notes as an integral value, so the value here is not necessarily going to be 
the Integer type.


---


[GitHub] activemq-artemis pull request #2490: V2 196

2019-01-08 Thread gemmellr
Github user gemmellr commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2490#discussion_r245972322
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/QueueConsumerPriorityTest.java
 ---
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.openwire.amq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+
+import 
org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueueConsumerPriorityTest extends BasicOpenWireTest {
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+  super.setUp();
+  this.makeSureCoreQueueExist("QUEUE.A");
+   }
+   @Test
+   public void testQueueConsumerPriority() throws JMSException, 
InterruptedException {
+  connection.start();
+  Session consumerLowPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  Session consumerHighPriority = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  assertNotNull(consumerHighPriority);
+  Session senderSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+  String queueName = "QUEUE.A";
+  ActiveMQQueue low = new ActiveMQQueue(queueName + 
"?consumer.priority=1");
+  MessageConsumer lowConsumer = 
consumerLowPriority.createConsumer(low);
+
+  ActiveMQQueue high = new ActiveMQQueue(queueName + 
"?consumer.priority=2");
+  MessageConsumer highConsumer = 
consumerLowPriority.createConsumer(high);
+
+  ActiveMQQueue senderQueue = new ActiveMQQueue(queueName);
+
+  MessageProducer producer = senderSession.createProducer(senderQueue);
+
+  Message msg = senderSession.createTextMessage("test");
+  for (int i = 0; i < 1000; i++) {
+ producer.send(msg);
+ assertNotNull("null on iteration: " + i, 
highConsumer.receive(1000));
+  }
+  assertNull(lowConsumer.receive(2000));
--- End diff --

Would a receiveNoWait (either in or outside the loop) like the other tests 
be nicer than burning 2 seconds? Slow tests is a key reason eventually noone 
wants to runs the tests :)


---


Random Access Queues, possible?

2019-01-08 Thread Andreas Mueller
Hi,

we have a sub project that currently runs within SwiftMQ as a plugin and uses 
SwiftMQ’s Swiftlet API to communicate with the internal components. I’m 
currently evaluating to port it to Artemis where it should run as a broker 
plugin. If that is possible with reasonable effort, we intend to make this sub 
project available as open source.

This library uses queues as kind of database. We do not want to use a real 
database such as JDBC for it because we want it completely broker centric 
without dependencies and we want it transactional consistent, e.g. when a HA 
broker fails over, the data should be transactional save at the new HA instance.

To accomplish this, we need random access to queues as specified in this little 
interface:

public interface RandomAccessQueue {
/**
 * Returns keys of all messages in this queue.
 * @return List of keys
 */
List getKeys();

/**
 * Returns a message by its key. The message is not locked.
 * @param key
 * @return Message
 */
Message getMessageByKey(Object key);

/**
 * Locks all messages for removal by their key
 * @param txid Transaction id
 * @param keys List of keys
 */
void lockForRemoval(Object txid, List keys);

/**
 * Commits a transaction. Removes all messages that are locked in this 
transaction id.
 * @param txid Transaction id
 */
void commit(Object txid);

/**
 * Aborts a transaction. All messages locked for this transaction are 
simply unlocked.
 * @param txid Transaction id
 */
void abort(Object txid);
}

I’ve walked through the Artemis docs but did not find a way to do this. 

Can anyone tell me if that is possible? If yes, what are the implications in 
terms of performance if I get a message from an arbitrary position of the queue 
and remove it? I want to avoid a full scan of the transaction log, for example.

Thanks!

Regards,
Andreas

-- 
Andreas Mueller
IIT Software GmbH
http://www.swiftmq.com





IIT Software GmbH
Falkenhorst 11, 48155 Münster, Germany
Phone: +49 (0)251 39 72 99 00
Managing Director: Andreas Müller
District Court: Amtsgericht Münster, HRB 16294
VAT-No: DE199945912

This e-mail may contain confidential and/or privileged information. If you are 
not the intended recipient (or have received this e-mail in error) please 
notify the sender immediately and destroy this e-mail. Any unauthorized 
copying, disclosure or distribution of the material in this e-mail is strictly 
forbidden.



[GitHub] activemq-artemis issue #2492: ARTEMIS-2222 why the position remains unchange...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2492
  
This had a specific purpose so we ignore deleted or completed pages.

As per comment in code.

// any deleted or complete page will be ignored on the 
moveNextPage, we will just keep going


---


[GitHub] activemq-artemis pull request #2467: ARTEMIS-2205 Performance improvements o...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2467#discussion_r245960681
  
--- Diff: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 ---
@@ -730,22 +793,29 @@ public int deliverMessage(MessageReference 
messageReference, int deliveryCount,
 
 if (preSettle) {
// Presettled means the client implicitly accepts any 
delivery we send it.
-   sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   try {
+  sessionSPI.ack(null, brokerConsumer, 
messageReference.getMessage());
+   } catch (Exception e) {
+  log.debug(e.getMessage(), e);
+   }
delivery.settle();
 } else {
sender.advance();
 }
 
 connection.flush();
  } finally {
-connection.unlock();
+synchronized (creditsLock) {
+   pending.decrementAndGet();
+}
+if (releaseRequired) {
+   ((NettyReadable) sendBuffer).getByteBuf().release();
+}
  }
+  } catch (Exception e) {
+ log.warn(e.getMessage(), e);
 
- return size;
-  } finally {
- if (releaseRequired) {
-((NettyReadable) sendBuffer).getByteBuf().release();
- }
+ // important todo: Error treatment
--- End diff --

Did you look over this? 


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-08 Thread franz1981
Github user franz1981 commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
FYI: we are working to fix the CI machines now so I won't be able ATM to 
run any job on it :(
I hope to have the CI machines up and running soon to continue :+1: 


---


[GitHub] activemq-artemis issue #2491: ARTEMIS-2217 remove state on clean MQTT sessio...

2019-01-08 Thread onlyMIT
Github user onlyMIT commented on the issue:

https://github.com/apache/activemq-artemis/pull/2491
  
@jbertram  I think I found out why your solution passed the test, your 
test was constructed on a wrong test code. the solution your provided, after I 
tested it with the revised test code, the test did not pass.
The test issue, i created a jira and opened a  
[#2493](https://github.com/apache/activemq-artemis/pull/2493) to solve this 
test issue. 
we need it, calling "clean()" in the "setIsClean(boolean isClean)" method.


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-08 Thread qihongxu
Github user qihongxu commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  



> @qihongxu @michaelandrepearce
> I'm running now a CI job: it will take some time, but when it will be 
fine I will merge this 
> @qihongxu After all the relevant bits re paging will be merged I will 
send another PR with the same 2 commits I have sent to your branch: are you 
available to give some help to check the effects of that PR on your tests?


> @qihongxu big thanks for all the effort on this!! And providing the 
testing time


My pleasure :) We are glad to see any boost in perf, especially on paging 
mode.

I will keep a close watch on the new PR you mentioned and ran some more 
tests as we have done in this issue if needed.

Also thank you all for reviews and works on this PR! 


---


[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...

2019-01-08 Thread onlyMIT
GitHub user onlyMIT opened a pull request:

https://github.com/apache/activemq-artemis/pull/2493

ARTEMIS-2223 when a new consumer is created, no subscription is called.

In the 'MQTTTest.testCleanSession()' test method, when a new consumer is 
created, no subscription is called.Consumers need to subscribe before they 
consume the news.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/onlyMIT/activemq-artemis ARTEMIS-2223

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2493.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2493


commit 2a6d230c4c8cd739090c5da58754a005d62e1a40
Author: onlyMIT 
Date:   2019-01-08T09:39:37Z

ARTEMIS-2223 when a new consumer is created, no subscription is called.

In the 'MQTTTest.testCleanSession()' test method, when a new consumer is 
created, no subscription is called.Consumers need to subscribe before they 
consume the news.




---


[GitHub] activemq-artemis issue #2492: ARTEMIS-2222 why the position remains unchange...

2019-01-08 Thread CNNJYB
Github user CNNJYB commented on the issue:

https://github.com/apache/activemq-artemis/pull/2492
  
> It is correct that the first commit is related to ARTEMIS-2144?

Branch is not latest. Now updated.


---


[GitHub] activemq-artemis issue #2492: ARTEMIS-2222 why the position remains unchange...

2019-01-08 Thread franz1981
Github user franz1981 commented on the issue:

https://github.com/apache/activemq-artemis/pull/2492
  
It is correct that the first commit is related to ARTEMIS-2144?


---


[GitHub] activemq-artemis pull request #2492: ARTEMIS-2222 why the position remains u...

2019-01-08 Thread CNNJYB
GitHub user CNNJYB opened a pull request:

https://github.com/apache/activemq-artemis/pull/2492

ARTEMIS- why the position remains unchanged if ignored is set to true

I am a bit confused about this, When CursorIterator:next is called during 
queue depage, if ignored is set to true, why the position remains unchanged.
  
if (!ignored) {
position = message.getPosition();
}   
For example, the client sends some messages to the topic subscriber ta 
(this topic has two subscribers ta and tb), every time tb depage continuous 
PagePositions that ignored are set to true will be traversed again.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/CNNJYB/activemq-artemis 
dev-CursorIterator-moveNext-ignored

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/activemq-artemis/pull/2492.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2492


commit 49f8bcd99640209c825c22e54a32c872850cc000
Author: andytaylor 
Date:   2018-10-24T10:21:52Z

ARTEMIS-2144 - tx begin failure in ra doesn't get cleaned up

https://issues.apache.org/jira/browse/ARTEMIS-2144

commit 7ae39f7d7218e7c5a93eaecb8f24af3d6186e6f3
Author: yb <17061955@...>
Date:   2019-01-08T09:14:18Z

ARTEMIS- why the position remains unchanged if ignored is set to true




---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-08 Thread michaelandrepearce
Github user michaelandrepearce commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@qihongxu big thanks for all the effort on this!!


---


[GitHub] activemq-artemis issue #2484: ARTEMIS-2216 Use a specific executor for pageS...

2019-01-08 Thread franz1981
Github user franz1981 commented on the issue:

https://github.com/apache/activemq-artemis/pull/2484
  
@qihongxu @michaelandrepearce 
I'm running now a CI job: it will take some time, but when it will be fine 
I will merge this :+1: 
@qihongxu After all the relevant bits re paging will be merged I will send 
another PR with the same 2 commits I have sent to your branch: are you 
available to give some help to check the effects of that PR on your tests? 


---