ARTEMIS-1791 Large message files are not removed after redistribution across a cluster
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/de5c0d51 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/de5c0d51 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/de5c0d51 Branch: refs/heads/master Commit: de5c0d51b976a2a3c60235da56cfd854418007a7 Parents: c69d6b0 Author: Howard Gao <howard....@gmail.com> Authored: Mon Apr 9 11:07:49 2018 +0800 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Mon Apr 9 11:06:27 2018 -0400 ---------------------------------------------------------------------- .../artemis/core/postoffice/Bindings.java | 2 ++ .../core/postoffice/impl/BindingsImpl.java | 5 ++++ .../core/postoffice/impl/PostOfficeImpl.java | 25 ++++++++++++++++---- .../core/server/cluster/impl/Redistributor.java | 1 + .../impl/WildcardAddressManagerUnitTest.java | 5 ++++ 5 files changed, 33 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java index f3592c4..30a2680 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/Bindings.java @@ -39,4 +39,6 @@ public interface Bindings extends UnproposalListener { boolean redistribute(Message message, Queue originatingQueue, RoutingContext context) throws Exception; void route(Message message, RoutingContext context) throws Exception; + + boolean allowRedistribute(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java index 2e2b31c..478c700 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java @@ -155,6 +155,11 @@ public final class BindingsImpl implements Bindings { } @Override + public boolean allowRedistribute() { + return messageLoadBalancingType.equals(MessageLoadBalancingType.ON_DEMAND); + } + + @Override public boolean redistribute(final Message message, final Queue originatingQueue, final RoutingContext context) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index b2bfe37..f1f7a38 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -970,14 +970,29 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding public Pair<RoutingContext, Message> redistribute(final Message message, final Queue originatingQueue, final Transaction tx) throws Exception { - // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message - // arrived the target node - // as described on https://issues.jboss.org/browse/JBPAPP-6130 - Message copyRedistribute = message.copy(storageManager.generateID()); Bindings bindings = addressManager.getBindingsForRoutingAddress(originatingQueue.getAddress()); - if (bindings != null) { + if (bindings != null && bindings.allowRedistribute()) { + // We have to copy the message and store it separately, otherwise we may lose remote bindings in case of restart before the message + // arrived the target node + // as described on https://issues.jboss.org/browse/JBPAPP-6130 + Message copyRedistribute = message.copy(storageManager.generateID()); + if (tx != null) { + tx.addOperation(new TransactionOperationAbstract() { + @Override + public void afterRollback(Transaction tx) { + try { + //this will cause large message file to be + //cleaned up + copyRedistribute.decrementRefCount(); + } catch (Exception e) { + logger.warn("Failed to clean up message: " + copyRedistribute); + } + } + }); + } + RoutingContext context = new RoutingContextImpl(tx); boolean routed = bindings.redistribute(copyRedistribute, originatingQueue, context); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java index cfb9eee..7982018 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java @@ -150,6 +150,7 @@ public class Redistributor implements Consumer { final Pair<RoutingContext, Message> routingInfo = postOffice.redistribute(reference.getMessage(), queue, tx); if (routingInfo == null) { + tx.rollback(); return HandleStatus.BUSY; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/de5c0d51/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java index 1c13cbd..40fadf9 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java @@ -345,6 +345,11 @@ public class WildcardAddressManagerUnitTest extends ActiveMQTestBase { public void route(Message message, RoutingContext context) throws Exception { System.out.println("routing message: " + message); } + + @Override + public boolean allowRedistribute() { + return false; + } } }