[2/6] activemq git commit: AMQ-6858 - reworking durable subscription propagation fix
AMQ-6858 - reworking durable subscription propagation fix Significantly reworking previous fix so that the client id is properly changed when tracking network proxy subscriptions. This makes it so removal is done properly (cherry picked from commit 41211c78d19b545a2352584d3598346aa3705be4) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97fe20a5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97fe20a5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97fe20a5 Branch: refs/heads/activemq-5.15.x Commit: 97fe20a5721a39b70f841f303024fa30352d7336 Parents: d5a987b Author: Christopher L. ShannonAuthored: Sun Nov 12 15:37:40 2017 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Thu Nov 16 07:50:19 2017 -0500 -- .../apache/activemq/network/ConduitBridge.java | 8 +- .../network/DemandForwardingBridgeSupport.java | 86 ++- .../activemq/network/DemandSubscription.java| 8 + .../activemq/network/DurableConduitBridge.java | 6 +- .../DurableFiveBrokerNetworkBridgeTest.java | 576 +++ .../DurableThreeBrokerNetworkBridgeTest.java| 241 6 files changed, 659 insertions(+), 266 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index 6ced896..bc9d004 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.SubscriptionInfo; @@ -80,7 +81,12 @@ public class ConduitBridge extends DemandForwardingBridge { ds.addForcedDurableConsumer(info.getConsumerId()); } } else { -ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + if (isProxyNSConsumer(info)) { + final BrokerId[] path = info.getBrokerPath(); + addProxyNetworkSubscription(ds, path, info.getSubscriptionName()); + } else { + ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + } } matched = true; // continue - we want interest to any existing DemandSubscriptions http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index efdfa5a..03e79e4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; @@ -94,7 +95,6 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.failover.FailoverTransport; -import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; @@ -666,11 +666,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br * @param info * @return */ -protected boolean isBridgeNS(ConsumerInfo info) { +protected boolean isDirectBridgeConsumer(ConsumerInfo info) { return (info.getSubscriptionName() != null &&
activemq git commit: AMQ-6858 - reworking durable subscription propagation fix
Repository: activemq Updated Branches: refs/heads/master a0a23b99c -> 41211c78d AMQ-6858 - reworking durable subscription propagation fix Significantly reworking previous fix so that the client id is properly changed when tracking network proxy subscriptions. This makes it so removal is done properly Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41211c78 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41211c78 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41211c78 Branch: refs/heads/master Commit: 41211c78d19b545a2352584d3598346aa3705be4 Parents: a0a23b9 Author: Christopher L. ShannonAuthored: Sun Nov 12 15:37:40 2017 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Mon Nov 13 11:07:43 2017 -0500 -- .../apache/activemq/network/ConduitBridge.java | 8 +- .../network/DemandForwardingBridgeSupport.java | 86 ++- .../activemq/network/DemandSubscription.java| 8 + .../activemq/network/DurableConduitBridge.java | 6 +- .../DurableFiveBrokerNetworkBridgeTest.java | 576 +++ .../DurableThreeBrokerNetworkBridgeTest.java| 241 6 files changed, 659 insertions(+), 266 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index 6ced896..bc9d004 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.SubscriptionInfo; @@ -80,7 +81,12 @@ public class ConduitBridge extends DemandForwardingBridge { ds.addForcedDurableConsumer(info.getConsumerId()); } } else { -ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + if (isProxyNSConsumer(info)) { + final BrokerId[] path = info.getBrokerPath(); + addProxyNetworkSubscription(ds, path, info.getSubscriptionName()); + } else { + ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); + } } matched = true; // continue - we want interest to any existing DemandSubscriptions http://git-wip-us.apache.org/repos/asf/activemq/blob/41211c78/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index efdfa5a..03e79e4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -36,6 +36,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.management.ObjectName; @@ -94,7 +95,6 @@ import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportDisposedIOException; import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.failover.FailoverTransport; -import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.transport.tcp.TcpTransport; import org.apache.activemq.util.IdGenerator; import org.apache.activemq.util.IntrospectionSupport; @@ -666,11 +666,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br * @param info * @return */ -protected boolean isBridgeNS(ConsumerInfo info) { +protected boolean isDirectBridgeConsumer(ConsumerInfo info) { return (info.getSubscriptionName() != null &&