[1/2] activemq-artemis git commit: ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest

2017-11-09 Thread clebertsuconic
Repository: activemq-artemis
Updated Branches:
  refs/heads/master d03c4c8cc -> d94efe044


ARTEMIS-1509 Add support for JdbcNodeManager into the NettyFailoverTest


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8e8a6f0f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8e8a6f0f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8e8a6f0f

Branch: refs/heads/master
Commit: 8e8a6f0faf121a03b07b1baa23f5d91ec1cecdc3
Parents: d03c4c8
Author: Francesco Nigro 
Authored: Tue Nov 7 17:44:57 2017 +0100
Committer: Francesco Nigro 
Committed: Wed Nov 8 14:52:57 2017 +0100

--
 .../cluster/failover/FailoverTest.java  | 25 --
 .../cluster/failover/FailoverTestBase.java  | 19 -
 .../cluster/failover/NettyFailoverTest.java | 85 
 3 files changed, 120 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e8a6f0f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
--
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index c5954cf..dbeeec3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -111,7 +111,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailover() throws Exception {
   
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
 
-  ((InVMNodeManager) nodeManager).failoverPause = 500;
+  if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 500L;
+  }
 
   ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) 
createSessionFactory(locator);
 
@@ -176,7 +178,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverConsume() throws Exception {
   
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(300).setRetryInterval(100).setAckBatchSize(0);
 
-  ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+  if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+  }
 
   ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) 
createSessionFactory(locator);
 
@@ -237,7 +241,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
   
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
 
-  ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+  if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+  }
 
   ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) 
createSessionFactory(locator);
 
@@ -330,7 +336,9 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionCommit() throws Exception {
   
locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
 
-  ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+  if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+  }
 
   ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) 
createSessionFactory(locator);
 
@@ -397,12 +405,13 @@ public class FailoverTest extends FailoverTestBase {
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() 
throws Exception {
   
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
 
-  ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+  if (nodeManager instanceof InVMNodeManager) {
+ ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+  }
 
   ClientSessionFactoryInternal sf1 = (ClientSessionFactoryIntern

[2/2] activemq-artemis git commit: This closes #1651

2017-11-09 Thread clebertsuconic
This closes #1651


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d94efe04
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d94efe04
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d94efe04

Branch: refs/heads/master
Commit: d94efe0442d64b60b1c17456c88c41c1e61481af
Parents: d03c4c8 8e8a6f0
Author: Clebert Suconic 
Authored: Thu Nov 9 11:16:00 2017 -0500
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:16:00 2017 -0500

--
 .../cluster/failover/FailoverTest.java  | 25 --
 .../cluster/failover/FailoverTestBase.java  | 19 -
 .../cluster/failover/NettyFailoverTest.java | 85 
 3 files changed, 120 insertions(+), 9 deletions(-)
--




[1/2] activemq-artemis git commit: This closes #1646

2017-11-09 Thread clebertsuconic
Repository: activemq-artemis
Updated Branches:
  refs/heads/master d94efe044 -> 93f4e41be


This closes #1646


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/93f4e41b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/93f4e41b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/93f4e41b

Branch: refs/heads/master
Commit: 93f4e41beb90a6073cd8b79cb00fc2f3073112dd
Parents: d94efe0 949f21a
Author: Clebert Suconic 
Authored: Thu Nov 9 11:18:48 2017 -0500
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:18:48 2017 -0500

--
 RELEASING.md | 106 +++---
 1 file changed, 102 insertions(+), 4 deletions(-)
--




[2/2] activemq-artemis git commit: NO-JIRA update release documentation

2017-11-09 Thread clebertsuconic
NO-JIRA update release documentation


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/949f21a9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/949f21a9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/949f21a9

Branch: refs/heads/master
Commit: 949f21a923b9a17eb2c2cd9114eaac3d66894dbd
Parents: d94efe0
Author: Justin Bertram 
Authored: Mon Nov 6 14:27:17 2017 -0600
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:18:48 2017 -0500

--
 RELEASING.md | 106 +++---
 1 file changed, 102 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/949f21a9/RELEASING.md
--
diff --git a/RELEASING.md b/RELEASING.md
index 3f35aed..5218dfc 100644
--- a/RELEASING.md
+++ b/RELEASING.md
@@ -21,7 +21,7 @@ Things to do before issuing a new release:
 5. Check the manuals have been created properly
 6. Check the javadocs are created correctly (including the diagrams)
 
-* If every thing is successful follow these next steps to build and publish 
artifacts to Nexus and send out a release vote.
+If everything is successful follow these next steps to build and publish 
artifacts to Nexus and send out a release vote.
 
 ## Key to Sign the Release
 
@@ -164,6 +164,7 @@ svn commit
 
 Old staged releases can be cleaned out periodically.
 
+
 ## Locate Release Notes
 
 1. Go to the "Releases" page for the Artemis JIRA project: 
https://issues.apache.org/jira/projects/ARTEMIS?selectedItem=com.atlassian.jira.jira-projects-plugin:release-page
@@ -171,6 +172,7 @@ Old staged releases can be cleaned out periodically.
 3. Click the "Release Notes" link near the top of the page.
 4. Grab the URL to put into the VOTE email.
 
+
 ## Send Email
 
 Once all the artifacts are stage then send an email to 
`d...@activemq.apache.org`.  It should have a subject like `[VOTE] 
@@ -210,15 +212,53 @@ Here's my +1
 ```
 
 
+## Voting process
+
+Rules for the Apache voting process are stipulated 
[here](https://www.apache.org/foundation/voting.html).
+
+Assuming the vote is successful send a email with a subject like `[RESULT] 
[VOTE] Apache ActiveMQ Artemis ` 
+informing the list about the voting results, e.g.:
+
+```
+Results of the Apache ActiveMQ Artemis  release vote.
+
+Vote passes with 2 +1 binding votes.
+
+The following votes were received:
+
+Binding:
++1 John Doe
++1 Bill Smith
+
+Non Binding:
++1 Mike Williams
+
+Thank you to everyone who contributed and took the time to review the
+release candidates and vote.
+
+I'll move forward with the getting the release out and updating the
+relevant documentation.
+
+Regards
+```
+
+
 ## Promote artifacts to the dist release area
 
 After a successful vote, populate the dist release area using the staged
-files from the dist dev area to allow them to mirror.
+files from the dist dev area to allow them to mirror. Note: this can only
+be done by a PMC member.
 
 ```sh
 svn cp -m "add files for activemq-artemis-${CURRENT-RELEASE}" 
https://dist.apache.org/repos/dist/dev/activemq/activemq-artemis/${CURRENT-RELEASE}
 
https://dist.apache.org/repos/dist/release/activemq/activemq-artemis/${CURRENT-RELEASE}
 ```
-It can take up to 24hrs for there to be good mirror coverage. Mirror status 
can be viewed at https://www.apache.org/mirrors/.
+
+Good mirror coverage can take up to 24 hours. Mirror status can be viewed 
[here](https://www.apache.org/mirrors/).
+
+
+## Release the staging repo
+
+Go to https://repository.apache.org/#stagingRepositories and click the 
"Release" button.
 
 
 ## Web site update:
@@ -229,6 +269,63 @@ Make sure you get a copy of the website at:
 svn co 
https://svn.apache.org/repos/infra/websites/production/activemq/content/artemis/
 ```
 
+Once the mirrors are up-to-date then update the following:
+1. Copy release-notes-.html to release-notes-.html.
+2. Update release-notes-.html. Delete the existing list of bugs, 
features, improvements, etc. and replace it
+   with the HTML from the bottom of the release notes link you sent out with 
your VOTE email.
+3. Update past-releases.html. Copy the block of HTML dealing with the 
2nd-to-last release, paste it above the original, 
+   and modify the version numbers for the last release.
+4. Update download.html. Modify the block of HTML dealing with the last 
release so that the version numbers are for
+   the new release.
+5. Copy docs/latest to docs/.
+6. Create docs/latest and copy these files into it:
+1. contents of user-manual from 
+2. book.pdf version of user-manual (generated with `gitbook pdf`)
+3. book.epub version of user-manual (generated with `gitbook epub`)
+4. book.mobi version of user-manual (generate

activemq git commit: AMQ-6858 - Fix several durable subscription bridge propagation issues

2017-11-09 Thread cshannon
Repository: activemq
Updated Branches:
  refs/heads/master 7dad09a9c -> 96ce14b27


AMQ-6858 - Fix several durable subscription bridge propagation issues

Durable network proxy subs will now be properly created across multiple
bridges when 3 or more brokers are used.  Demand will be properly synced
and removed.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/96ce14b2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/96ce14b2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/96ce14b2

Branch: refs/heads/master
Commit: 96ce14b278fef9e5f428f1c3c07ce5c09fd8f9a8
Parents: 7dad09a
Author: Christopher L. Shannon (cshannon) 
Authored: Mon Nov 6 08:42:03 2017 -0500
Committer: Christopher L. Shannon (cshannon) 
Committed: Thu Nov 9 11:21:18 2017 -0500

--
 .../apache/activemq/network/ConduitBridge.java  |  16 +-
 .../network/DemandForwardingBridgeSupport.java  |  49 +++-
 .../activemq/network/DurableConduitBridge.java  |  19 +-
 .../network/NetworkBridgeConfiguration.java |  13 +
 .../DurableThreeBrokerNetworkBridgeTest.java| 241 +++
 .../VerifyNetworkConsumersDisconnectTest.java   |  68 +-
 6 files changed, 384 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 3c0b85b..6ced896 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -56,12 +56,16 @@ public class ConduitBridge extends DemandForwardingBridge {
 }
 
 protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, 
boolean isForcedDurable) {
-// search through existing subscriptions and see if we have a match
-if (info.isNetworkSubscription()) {
+//If a network subscription and a queue check if 
isConduitNetworkQueueSubscriptions is true
+//If true then we want to try and conduit
+//For topics we always want to conduit regardless of network 
subscription or not
+if (info.isNetworkSubscription() && info.getDestination().isQueue() &&
+!configuration.isConduitNetworkQueueSubscriptions()) {
 return false;
 }
 boolean matched = false;
 
+// search through existing subscriptions and see if we have a match
 for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
 DestinationFilter filter = 
DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
 if (canConduit(ds) && filter.matches(info.getDestination())) {
@@ -86,9 +90,13 @@ public class ConduitBridge extends DemandForwardingBridge {
 }
 
 // we want to conduit statically included consumers which are local 
networkSubs
-// but we don't want to conduit remote network subs i.e. (proxy proxy) 
consumers
+// but we don't want to conduit remote network queue subs i.e. (proxy 
proxy) consumers
+// unless isConduitNetworkQueueSubscriptions is true
+// We always want to conduit topic subscriptions
 private boolean canConduit(DemandSubscription ds) {
-return ds.isStaticallyIncluded() || 
!ds.getRemoteInfo().isNetworkSubscription();
+return ds.isStaticallyIncluded() || 
ds.getRemoteInfo().getDestination().isTopic() ||
+!ds.getRemoteInfo().isNetworkSubscription() ||
+(ds.getRemoteInfo().getDestination().isQueue() && 
configuration.isConduitNetworkQueueSubscriptions());
 }
 
 @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 879ab39..efdfa5a 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -661,6 +661,16 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 }
 }
 
+/**
+ * Checks whether or not this consumer is a direct bridge network 
subscription
+ * @param info
+ * @return
+ */
+protected boolean isBridgeNS(ConsumerIn

[1/2] activemq-artemis git commit: This closes #1648

2017-11-09 Thread clebertsuconic
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 93f4e41be -> c2a21c974


This closes #1648


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c2a21c97
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c2a21c97
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c2a21c97

Branch: refs/heads/master
Commit: c2a21c9743dca8ab38bb0ebffbbf74a5250756d7
Parents: 93f4e41 5cc8fae
Author: Clebert Suconic 
Authored: Thu Nov 9 11:52:44 2017 -0500
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:52:44 2017 -0500

--
 .../core/client/impl/ClientSessionImpl.java | 31 +---
 1 file changed, 21 insertions(+), 10 deletions(-)
--




[2/2] activemq-artemis git commit: ARTEMIS-1506 Synchronization issue during failover in ClientSessionImpl

2017-11-09 Thread clebertsuconic
ARTEMIS-1506 Synchronization issue during failover in ClientSessionImpl

The temporary deadlock is avoided by removing 'synchronized' from
ClientSessionImpl::getCredits method. As the method uses only
a producerCreditManger, only this object is guarded against
the parallel access.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5cc8faed
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5cc8faed
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5cc8faed

Branch: refs/heads/master
Commit: 5cc8faedd8fd2e80975c2aa12f8f30c1724b9626
Parents: 93f4e41
Author: Erich Duda 
Authored: Fri Nov 3 16:19:54 2017 +0100
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:52:44 2017 -0500

--
 .../core/client/impl/ClientSessionImpl.java | 31 +---
 1 file changed, 21 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5cc8faed/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
--
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 41330a6..61784ad 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1156,7 +1156,7 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
   try {
  closeChildren();
 
- synchronized (this) {
+ synchronized (producerCreditManager) {
 producerCreditManager.close();
  }
  inClose = true;
@@ -1177,7 +1177,9 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
  return;
   }
 
-  producerCreditManager.close();
+  synchronized (producerCreditManager) {
+ producerCreditManager.close();
+  }
 
   cleanUpChildren();
 
@@ -1282,7 +1284,9 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
  }
 
  if (resetCreditManager) {
-producerCreditManager.reset();
+synchronized (producerCreditManager) {
+   producerCreditManager.reset();
+}
 
 // Also need to send more credits for consumers, otherwise the 
system could hand with the server
 // not having any credits to send
@@ -1343,25 +1347,32 @@ public final class ClientSessionImpl implements 
ClientSessionInternal, FailureLi
}
 
@Override
-   public synchronized ClientProducerCredits getCredits(final SimpleString 
address, final boolean anon) {
-  ClientProducerCredits credits = 
producerCreditManager.getCredits(address, anon, sessionContext);
-
-  return credits;
+   public ClientProducerCredits getCredits(final SimpleString address, final 
boolean anon) {
+  synchronized (producerCreditManager) {
+ ClientProducerCredits credits = 
producerCreditManager.getCredits(address, anon, sessionContext);
+ return credits;
+  }
}
 
@Override
public void returnCredits(final SimpleString address) {
-  producerCreditManager.returnCredits(address);
+  synchronized (producerCreditManager) {
+ producerCreditManager.returnCredits(address);
+  }
}
 
@Override
public void handleReceiveProducerCredits(final SimpleString address, final 
int credits) {
-  producerCreditManager.receiveCredits(address, credits);
+  synchronized (producerCreditManager) {
+ producerCreditManager.receiveCredits(address, credits);
+  }
}
 
@Override
public void handleReceiveProducerFailCredits(final SimpleString address, 
int credits) {
-  producerCreditManager.receiveFailCredits(address, credits);
+  synchronized (producerCreditManager) {
+ producerCreditManager.receiveFailCredits(address, credits);
+  }
}
 
@Override



[4/7] activemq-artemis git commit: ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress

2017-11-09 Thread clebertsuconic
ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8bf879f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8bf879f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8bf879f1

Branch: refs/heads/master
Commit: 8bf879f1560b958907bf8f6808bc66b8f2402431
Parents: c2a21c9
Author: Francesco Nigro 
Authored: Thu Nov 2 10:51:43 2017 +0100
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:58:36 2017 -0500

--
 .../tests/integration/client/ConsumerTest.java  | 120 +++
 1 file changed, 120 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8bf879f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
--
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index af172c8..9c05114 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -31,9 +32,13 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.stream.Stream;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -1074,4 +1079,119 @@ public class ConsumerTest extends ActiveMQTestBase {
   session.close();
}
 
+   @Test
+   public void testMultipleConsumersOnSharedQueue() throws Throwable {
+  if (!isNetty() || this.durable) {
+ return;
+  }
+  final boolean durable = false;
+  final long TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
+  final int forks = 100;
+  final int queues = forks;
+  final int runs = 1;
+  final int messages = 1;
+  final ConnectionFactory factorySend = createFactory(1);
+  final AtomicLongArray receivedMessages = new AtomicLongArray(forks);
+  final Thread[] producersRunners = new Thread[forks];
+  final Thread[] consumersRunners = new Thread[forks];
+  //parties are forks (1 producer 1 consumer) + 1 controller in the main 
test thread
+  final CyclicBarrier onStartRun = new CyclicBarrier((forks * 2) + 1);
+  final CyclicBarrier onFinishRun = new CyclicBarrier((forks * 2) + 1);
+
+  final int messagesSent = forks * messages;
+  final AtomicInteger messagesRecieved = new AtomicInteger(0);
+
+  for (int i = 0; i < forks; i++) {
+ final int forkIndex = i;
+ final String queueName = "q_" + (forkIndex % queues);
+ final Thread producerRunner = new Thread(() -> {
+try (Connection connection = factorySend.createConnection()) {
+   connection.start();
+   try (Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE)) {
+  final javax.jms.Queue queue = session.createQueue(queueName);
+  try (MessageProducer producer = 
session.createProducer(queue)) {
+ producer.setDeliveryMode(durable ? 
DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ for (int r = 0; r < runs; r++) {
+onStartRun.await();
+for (int m = 0; m < messages; m++) {
+   final BytesMessage bytesMessage = 
session.createBytesMessage();
+   bytesMessage.writeInt(forkIndex);
+   producer.send(bytesMessage);
+}
+onFinishRun.await();
+ }
+  } catch (InterruptedException | BrokenBarrierException e) {
+ e.printStackTrace();
+  }
+   }
+} catch (JMSException e) {
+   e.printStackTrace();
+}

[2/7] activemq-artemis git commit: ARTEMIS-1495 Fixing In Handler executor and added benchmark to measure impact of changes

2017-11-09 Thread clebertsuconic
ARTEMIS-1495 Fixing In Handler executor and added benchmark to measure impact 
of changes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/91db0807
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/91db0807
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/91db0807

Branch: refs/heads/master
Commit: 91db08072b221885f246a9db70abf3ee0bdf170d
Parents: 0fadc68
Author: Clebert Suconic 
Authored: Wed Nov 8 09:16:59 2017 -0500
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:58:36 2017 -0500

--
 .../artemis/utils/actors/ArtemisExecutor.java   |  10 +-
 .../artemis/utils/actors/HandlerBase.java   |  47 ++
 .../artemis/utils/actors/ProcessorBase.java | 153 +--
 .../utils/actors/OrderedExecutorSanityTest.java |  69 -
 .../core/ServerSessionPacketHandler.java|  73 +++--
 .../artemis/tests/util/ActiveMQTestBase.java|   2 +-
 .../tests/integration/client/ConsumerTest.java  |   6 +-
 7 files changed, 260 insertions(+), 100 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 5e72ef2..8efb3d3 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -40,9 +42,15 @@ public interface ArtemisExecutor extends Executor {
 
/** It will wait the current execution (if there is one) to finish
 *  but will not complete any further executions */
-   default void shutdownNow() {
+   default List shutdownNow() {
+  return Collections.emptyList();
}
 
+
+   default void shutdown() {
+   }
+
+
/**
 * This will verify if the executor is flushed with no wait (or very 
minimal wait if not the {@link 
org.apache.activemq.artemis.utils.actors.OrderedExecutor}
 * @return

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/91db0807/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
new file mode 100644
index 000..6bfbcb4
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/HandlerBase.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2005-2014 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+/**
+ * This abstract class will encapsulate
+ * ThreadLocals to determine when a class is a handler.
+ * This is because some functionality has to be avoided if inHandler().
+ *
+ */
+public abstract class HandlerBase {
+
+   //marker instance used to recognize if a thread is performing a packet 
handling
+   private static final Object DUMMY = Boolean.TRUE;
+
+   // this cannot be static as the Actor will be used within another executor. 
For that reason
+   // each instance will have its own ThreadLocal.
+   // ... a thread that has its thread-local map populated with DUMMY while 
performing a handler
+   private final ThreadLocal inHandler = new ThreadLocal<>();
+
+   protected void enter() {
+  assert inHandler.get() == null : "should be null";
+  inHandler.set(DUMMY);
+   }
+
+   public boolean inHandler() {
+  final Object dummy = inHandler.get();
+  return dummy != null;
+   }
+
+   protected void leave() {
+  assert inHandler.get() != null : "marker not set"

[5/7] activemq-artemis git commit: ARTEMIS-1495 Sanity tests for the ProcessorBase::shutdownNow feature

2017-11-09 Thread clebertsuconic
ARTEMIS-1495 Sanity tests for the ProcessorBase::shutdownNow feature


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c5b57f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c5b57f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c5b57f1

Branch: refs/heads/master
Commit: 3c5b57f1e9ed2f3b71df9b38e9bcbd3be7e31d0a
Parents: 2e6176a
Author: Francesco Nigro 
Authored: Wed Nov 8 10:05:35 2017 +0100
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:58:36 2017 -0500

--
 .../artemis/utils/actors/ProcessorBase.java | 10 +++
 .../utils/actors/OrderedExecutorSanityTest.java | 81 
 2 files changed, 91 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c5b57f1/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 44b2916..8d19c22 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -114,4 +114,14 @@ public abstract class ProcessorBase {
   }
}
 
+   /**
+* Returns the remaining items to be processed.
+* 
+* This method is safe to be called by different threads and its accuracy 
is subject to concurrent modifications.
+* It is meant to be used only for test purposes, because of its {@code 
O(n)} cost.
+*/
+   public final int remaining() {
+  return tasks.size();
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c5b57f1/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
--
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
new file mode 100644
index 000..9446f50
--- /dev/null
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/OrderedExecutorSanityTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.actors;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OrderedExecutorSanityTest {
+
+   @Test
+   public void shouldExecuteTasksInOrder() throws InterruptedException {
+  final int threads = 3;
+  final int tasks = 100;
+  final long timeoutMillis = TimeUnit.SECONDS.toMillis(10);
+  final ExecutorService executorService = 
Executors.newFixedThreadPool(threads);
+  try {
+ final ArtemisExecutor executor = new OrderedExecutor(executorService);
+ //it can be not thread safe too
+ final List results = new ArrayList<>(tasks);
+ final List expectedResults = new ArrayList<>(tasks);
+ final CountDownLatch executed = new CountDownLatch(tasks);
+ for (int i = 0; i < tasks; i++) {
+final int value = i;
+executor.execute(() -> {
+   results.add(value);
+   executed.countDown();
+});
+expectedResults.add(value);
+ }
+ Assert.assertTrue("The tasks must be executed in " + timeoutMillis + 
" ms", executed.await(timeoutMillis, TimeUnit.MILLISECONDS));
+ Assert.assertArrayEquals("The processing of tasks must be ordered", 
expectedResults.toArray(), results.toArray());
+  } fina

[1/7] activemq-artemis git commit: This closes #1650

2017-11-09 Thread clebertsuconic
Repository: activemq-artemis
Updated Branches:
  refs/heads/master c2a21c974 -> ead60d54d


This closes #1650


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ead60d54
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ead60d54
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ead60d54

Branch: refs/heads/master
Commit: ead60d54d094e470033b65074e85fa143caf2300
Parents: c2a21c9 33b3eb6
Author: Clebert Suconic 
Authored: Thu Nov 9 11:58:36 2017 -0500
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:58:36 2017 -0500

--
 .../artemis/utils/actors/ArtemisExecutor.java   |  44 +++--
 .../artemis/utils/actors/HandlerBase.java   |  47 +
 .../artemis/utils/actors/ProcessorBase.java | 196 +++
 .../utils/actors/OrderedExecutorSanityTest.java | 148 ++
 .../cursor/impl/PageCursorProviderImpl.java |   2 +-
 .../core/paging/impl/PagingStoreImpl.java   |   2 +-
 .../core/ServerSessionPacketHandler.java|  83 +++-
 .../protocol/core/impl/CoreSessionCallback.java |   2 +-
 .../management/impl/ManagementServiceImpl.java  |   4 +-
 .../artemis/tests/util/ActiveMQTestBase.java|   2 +-
 .../tests/integration/client/ConsumerTest.java  | 126 +++-
 .../jms/consumer/JmsConsumerTest.java   |  11 +-
 12 files changed, 542 insertions(+), 125 deletions(-)
--




[3/7] activemq-artemis git commit: ARTEMIS-1495 Removing flushes from codebase

2017-11-09 Thread clebertsuconic
ARTEMIS-1495 Removing flushes from codebase

Instead of flushing we just need to make sure there are no more calls into
page executors as we stop the PageManager.

This will avoid any possible starvations or deadlocks here.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2e6176a6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2e6176a6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2e6176a6

Branch: refs/heads/master
Commit: 2e6176a6967c165ed463437d6302f3f8da9e
Parents: 8bf879f
Author: Clebert Suconic 
Authored: Tue Nov 7 14:52:19 2017 -0500
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:58:36 2017 -0500

--
 .../artemis/utils/actors/ArtemisExecutor.java   | 23 +++
 .../artemis/utils/actors/ProcessorBase.java | 68 +---
 .../cursor/impl/PageCursorProviderImpl.java |  2 +-
 .../core/paging/impl/PagingStoreImpl.java   |  2 +-
 .../core/ServerSessionPacketHandler.java| 14 ++--
 .../protocol/core/impl/CoreSessionCallback.java |  2 +-
 .../management/impl/ManagementServiceImpl.java  |  4 +-
 .../jms/consumer/JmsConsumerTest.java   | 11 ++--
 8 files changed, 58 insertions(+), 68 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index d3036ec..5e72ef2 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -25,7 +25,7 @@ public interface ArtemisExecutor extends Executor {
 
/**
 * Artemis is supposed to implement this properly, however in tests or tools
-* this can be used as a fake, doing a sipmle delegate and using the 
default methods implemented here.
+* this can be used as a fake, doing a simple delegate and using the 
default methods implemented here.
 * @param executor
 * @return
 */
@@ -38,11 +38,16 @@ public interface ArtemisExecutor extends Executor {
   };
}
 
-   default boolean flush() {
-  return flush(30, TimeUnit.SECONDS);
+   /** It will wait the current execution (if there is one) to finish
+*  but will not complete any further executions */
+   default void shutdownNow() {
}
 
-   default boolean flush(long timeout, TimeUnit unit) {
+   /**
+* This will verify if the executor is flushed with no wait (or very 
minimal wait if not the {@link 
org.apache.activemq.artemis.utils.actors.OrderedExecutor}
+* @return
+*/
+   default boolean isFlushed() {
   CountDownLatch latch = new CountDownLatch(1);
   Runnable runnable = new Runnable() {
  @Override
@@ -52,18 +57,10 @@ public interface ArtemisExecutor extends Executor {
   };
   execute(runnable);
   try {
- return latch.await(timeout, unit);
+ return latch.await(100, TimeUnit.MILLISECONDS);
   } catch (InterruptedException e) {
  return false;
   }
}
 
-   /**
-* This will verify if the executor is flushed with no wait (or very 
minimal wait if not the {@link 
org.apache.activemq.artemis.utils.actors.OrderedExecutor}
-* @return
-*/
-   default boolean isFlushed() {
-  return flush(100, TimeUnit.MILLISECONDS);
-   }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2e6176a6/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index dbc0776..44b2916 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -20,7 +20,6 @@ package org.apache.activemq.artemis.utils.actors;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 public abstract class ProcessorBase {
@@ -34,6 +33,9 @@ public abstract class ProcessorBase {
 
private final ExecutorTask task = new ExecutorTask();
 
+   private final Object startedGuard = new Objec

[6/7] activemq-artemis git commit: ARTEMIS-1495 Few perf improvements to: - reduce volatile loads - allow method inlining for hot execution paths - reduced pointers chasing due to inner classes uses

2017-11-09 Thread clebertsuconic
ARTEMIS-1495 Few perf improvements to:
 - reduce volatile loads
 - allow method inlining for hot execution paths
 - reduced pointers chasing due to inner classes uses


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/33b3eb6f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/33b3eb6f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/33b3eb6f

Branch: refs/heads/master
Commit: 33b3eb6f095da4a21648c268c7a960e55f414ca3
Parents: 91db080
Author: Francesco Nigro 
Authored: Thu Nov 9 11:26:21 2017 +0100
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:58:36 2017 -0500

--
 .../artemis/utils/actors/ArtemisExecutor.java   |  25 +++-
 .../artemis/utils/actors/ProcessorBase.java | 135 +++
 .../utils/actors/OrderedExecutorSanityTest.java |   4 +-
 3 files changed, 99 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
index 8efb3d3..9903d65 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ArtemisExecutor.java
@@ -17,11 +17,10 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
-import java.util.Collections;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 public interface ArtemisExecutor extends Executor {
 
@@ -40,10 +39,24 @@ public interface ArtemisExecutor extends Executor {
   };
}
 
-   /** It will wait the current execution (if there is one) to finish
-*  but will not complete any further executions */
-   default List shutdownNow() {
-  return Collections.emptyList();
+   /**
+* It will wait the current execution (if there is one) to finish
+* but will not complete any further executions.
+*
+* @param onPendingTask it will be called for each pending task found
+* @return the number of pending tasks that won't be executed
+*/
+   default int shutdownNow(Consumer onPendingTask) {
+  return 0;
+   }
+
+   /**
+* It will wait the current execution (if there is one) to finish
+* but will not complete any further executions
+*/
+   default int shutdownNow() {
+  return shutdownNow(t -> {
+  });
}
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/33b3eb6f/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 1c77a52..ff6d9a1 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -17,21 +17,19 @@
 
 package org.apache.activemq.artemis.utils.actors;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.locks.LockSupport;
+import java.util.function.Consumer;
 
 import org.jboss.logging.Logger;
 
 public abstract class ProcessorBase extends HandlerBase {
 
private static final Logger logger = Logger.getLogger(ProcessorBase.class);
-
public static final int STATE_NOT_RUNNING = 0;
public static final int STATE_RUNNING = 1;
public static final int STATE_FORCED_SHUTDOWN = 2;
@@ -39,53 +37,50 @@ public abstract class ProcessorBase extends HandlerBase {
protected final Queue tasks = new ConcurrentLinkedQueue<>();
 
private final Executor delegate;
-
-   private final ExecutorTask task = new ExecutorTask();
+   /**
+* Using a method reference instead of an inner classes allows the caller 
to reduce the pointer chasing
+* when accessing ProcessorBase.this fields/methods.
+*/
+   private final Runnable task = this::executePendingTasks;
 
// used by stateUpdater
@SuppressWarnings("unused")
private volatile int state = STATE_NOT_RUNNING;
-
+  

[7/7] activemq-artemis git commit: ARTEMIS-1495 Lock-free ProcessorBase::shutdownNow

2017-11-09 Thread clebertsuconic
ARTEMIS-1495 Lock-free ProcessorBase::shutdownNow


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0fadc68c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0fadc68c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0fadc68c

Branch: refs/heads/master
Commit: 0fadc68ca503eb35d75ac95292cd85339dc8b017
Parents: 3c5b57f
Author: Francesco Nigro 
Authored: Wed Nov 8 12:03:49 2017 +0100
Committer: Clebert Suconic 
Committed: Thu Nov 9 11:58:36 2017 -0500

--
 .../artemis/utils/actors/ProcessorBase.java | 72 +---
 1 file changed, 47 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0fadc68c/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
--
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
index 8d19c22..73dbf2f 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ProcessorBase.java
@@ -21,11 +21,13 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
 
 public abstract class ProcessorBase {
 
private static final int STATE_NOT_RUNNING = 0;
private static final int STATE_RUNNING = 1;
+   private static final int STATE_FORCED_SHUTDOWN = 2;
 
protected final Queue tasks = new ConcurrentLinkedQueue<>();
 
@@ -33,12 +35,11 @@ public abstract class ProcessorBase {
 
private final ExecutorTask task = new ExecutorTask();
 
-   private final Object startedGuard = new Object();
-   private volatile boolean started = true;
-
// used by stateUpdater
@SuppressWarnings("unused")
-   private volatile int state = 0;
+   private volatile int state = STATE_NOT_RUNNING;
+
+   private volatile boolean requestedShutdown = false;
 
private static final AtomicIntegerFieldUpdater stateUpdater 
= AtomicIntegerFieldUpdater.newUpdater(ProcessorBase.class, "state");
 
@@ -47,26 +48,22 @@ public abstract class ProcessorBase {
   @Override
   public void run() {
  do {
-//if there is no thread active then we run
+//if there is no thread active and is not already dead then we run
 if (stateUpdater.compareAndSet(ProcessorBase.this, 
STATE_NOT_RUNNING, STATE_RUNNING)) {
-   T task = tasks.poll();
-   //while the queue is not empty we process in order
-
-   // All we care on started, is that a current task is not 
running as we call shutdown.
-   // for that reason this first run doesn't need to be under any 
lock
-   while (task != null && started) {
-
-  // Synchronized here is just to guarantee that a current 
task is finished before
-  // the started update can be taken as false
-  synchronized (startedGuard) {
- if (started) {
+   try {
+  T task = tasks.poll();
+  //while the queue is not empty we process in order
+  while (task != null) {
+ //just drain the tasks if has been requested a shutdown 
to help the shutdown process
+ if (!requestedShutdown) {
 doTask(task);
  }
+ task = tasks.poll();
   }
-  task = tasks.poll();
+   } finally {
+  //set state back to not running.
+  stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
}
-   //set state back to not running.
-   stateUpdater.set(ProcessorBase.this, STATE_NOT_RUNNING);
 } else {
return;
 }
@@ -81,10 +78,28 @@ public abstract class ProcessorBase {
/** It will wait the current execution (if there is one) to finish
 *  but will not complete any further executions */
public void shutdownNow() {
-  synchronized (startedGuard) {
- started = false;
+  //alert anyone that has been requested (at least) an immediate shutdown
+  requestedShutdown = true;
+  //it could take a very long time depending on the current executing task
+  do {
+ //alert the ExecutorTask (if is running) to just drain the current 
backlog of tasks
+ final int start

Jenkins build is still unstable: ActiveMQ-Java8-All-UnitTests #762

2017-11-09 Thread Apache Jenkins Server
See