This is an automated email from the ASF dual-hosted git repository. vavrtom pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/main by this push: new 28cbf3b QPID-8558: [Broker-J] Enhancement of sole connection enforcement policy evaluation (#106) 28cbf3b is described below commit 28cbf3b2ba40e3a895c723b3af84b07810cf43c9 Author: Marek Laca <mkl...@users.noreply.github.com> AuthorDate: Mon Nov 15 09:18:01 2021 +0100 QPID-8558: [Broker-J] Enhancement of sole connection enforcement policy evaluation (#106) --- .../qpid/server/model/NamedAddressSpace.java | 5 +- .../limit/ConnectionLimiterService.java} | 9 +- .../AbstractNonConnectionAcceptingVirtualHost.java | 4 +- .../server/virtualhost/AbstractVirtualHost.java | 110 +----- .../NoopConnectionEstablishmentPolicy.java | 33 -- .../virtualhost/VirtualHostConnectionLimiter.java | 21 ++ .../apache/qpid/server/model/VirtualHostTest.java | 15 +- .../server/protocol/v0_10/ServerConnection.java | 3 +- .../protocol/v0_8/AMQPConnection_0_8Impl.java | 3 +- .../server/protocol/v1_0/AMQPConnection_1_0.java | 3 + .../protocol/v1_0/AMQPConnection_1_0Impl.java | 143 ++++---- .../soleconn/SoleConnectionDetectionPolicy.java | 49 +-- .../soleconn/SoleConnectionEnforcementPolicy.java | 48 +-- .../SoleConnectionEnforcementPolicyException.java | 52 +++ .../StrongConnectionEstablishmentLimiter.java | 218 +++++++++++ .../protocol/v1_0/ProtocolEngine_1_0_0Test.java | 15 +- .../SoleConnectionDetectionPolicyTest.java | 62 ++++ .../SoleConnectionEnforcementPolicyTest.java | 60 ++++ .../StrongConnectionEstablishmentLimiterTest.java | 398 +++++++++++++++++++++ .../management/amqp/ManagementAddressSpace.java | 5 +- 20 files changed, 945 insertions(+), 311 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java index 2792a31..09341c5 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java +++ b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java @@ -34,7 +34,6 @@ import org.apache.qpid.server.protocol.LinkModel; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy; import org.apache.qpid.server.virtualhost.LinkRegistryModel; public interface NamedAddressSpace extends Named @@ -46,11 +45,9 @@ public interface NamedAddressSpace extends Named MessageDestination getAttainedMessageDestination(String name, boolean mayCreate); - boolean registerConnection(AMQPConnection<?> connection, - final ConnectionEstablishmentPolicy connectionEstablishmentPolicy); + void registerConnection(AMQPConnection<?> connection); void deregisterConnection(AMQPConnection<?> connection); - String getRedirectHost(AmqpPort<?> port); Principal getPrincipal(); diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java b/broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java similarity index 74% rename from broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java rename to broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java index 004e3db..d526589 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java +++ b/broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java @@ -15,14 +15,11 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ +package org.apache.qpid.server.security.limit; -package org.apache.qpid.server.virtualhost; - -import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.server.plugin.Pluggable; -public interface ConnectionEstablishmentPolicy +public interface ConnectionLimiterService extends ConnectionLimiter, Pluggable { - boolean mayEstablishNewConnection(Iterable<AMQPConnection<?>> existingConnections, AMQPConnection<?> newConnection); } diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java index 6d75ad1..b32dd49 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java @@ -72,11 +72,9 @@ public abstract class AbstractNonConnectionAcceptingVirtualHost<X extends Abstra } @Override - public boolean registerConnection(final AMQPConnection<?> connection, - final ConnectionEstablishmentPolicy connectionEstablishmentPolicy) + public void registerConnection(final AMQPConnection<?> connection) { throwUnsupported(); - return false; } @Override diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 09c4fc1..cd1c26c 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -2646,70 +2646,22 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } @Override - public boolean registerConnection(final AMQPConnection<?> connection, - final ConnectionEstablishmentPolicy connectionEstablishmentPolicy) + public void registerConnection(final AMQPConnection<?> connection) { + if (!_acceptsConnections.get()) + { + throw new VirtualHostUnavailableException(String.format( + "VirtualHost '%s' not accepting connections", + getName())); + } _connectionLimiter.register(connection); - return doSync(registerConnectionAsync(connection, connectionEstablishmentPolicy)); - } - - public ListenableFuture<Boolean> registerConnectionAsync(final AMQPConnection<?> connection, - final ConnectionEstablishmentPolicy connectionEstablishmentPolicy) - { - return doOnConfigThread(new Task<ListenableFuture<Boolean>, RuntimeException>() + _connections.add(connection); + _totalConnectionCount.incrementAndGet(); + if (_blocked.get()) { - @Override - public ListenableFuture<Boolean> execute() - { - if (_acceptsConnections.get()) - { - if (connectionEstablishmentPolicy.mayEstablishNewConnection(_connections, connection)) - { - _connections.add(connection); - _totalConnectionCount.incrementAndGet(); - - if (_blocked.get()) - { - connection.block(); - } - - connection.pushScheduler(_networkConnectionScheduler); - return Futures.immediateFuture(true); - } - else - { - return Futures.immediateFuture(false); - } - } - else - { - final VirtualHostUnavailableException exception = - new VirtualHostUnavailableException(String.format( - "VirtualHost '%s' not accepting connections", - getName())); - return Futures.immediateFailedFuture(exception); - } - } - - @Override - public String getObject() - { - return AbstractVirtualHost.this.toString(); - } - - @Override - public String getAction() - { - return "register connection"; - } - - @Override - public String getArguments() - { - return String.valueOf(connection); - } - }); - + connection.block(); + } + connection.pushScheduler(_networkConnectionScheduler); } @Override @@ -2721,43 +2673,11 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte } finally { - doSync(deregisterConnectionAsync(connection)); - + connection.popScheduler(); + _connections.remove(connection); } } - public ListenableFuture<Void> deregisterConnectionAsync(final AMQPConnection<?> connection) - { - return doOnConfigThread(new Task<ListenableFuture<Void>, RuntimeException>() - { - @Override - public ListenableFuture<Void> execute() - { - connection.popScheduler(); - _connections.remove(connection); - return Futures.immediateFuture(null); - } - - @Override - public String getObject() - { - return AbstractVirtualHost.this.toString(); - } - - @Override - public String getAction() - { - return "deregister connection"; - } - - @Override - public String getArguments() - { - return String.valueOf(connection); - } - }); - } - @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, desiredState = State.ACTIVE) private ListenableFuture<Void> onActivate() { diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java deleted file mode 100644 index c1c7b44..0000000 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.server.virtualhost; - -import org.apache.qpid.server.transport.AMQPConnection; - -public class NoopConnectionEstablishmentPolicy implements ConnectionEstablishmentPolicy -{ - @Override - public boolean mayEstablishNewConnection(final Iterable<AMQPConnection<?>> existingConnections, - final AMQPConnection<?> newConnection) - { - return true; - } -} diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java index 4188e1a..b24447e 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java +++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java @@ -18,10 +18,12 @@ */ package org.apache.qpid.server.virtualhost; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,10 +36,12 @@ import org.apache.qpid.server.model.State; import org.apache.qpid.server.model.SystemConfig; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.model.VirtualHostConnectionLimitProvider; +import org.apache.qpid.server.plugin.QpidServiceLoader; import org.apache.qpid.server.security.limit.CachedConnectionLimiterImpl; import org.apache.qpid.server.security.limit.ConnectionLimitProvider; import org.apache.qpid.server.security.limit.ConnectionLimiter; import org.apache.qpid.server.security.limit.ConnectionLimiter.CachedLimiter; +import org.apache.qpid.server.security.limit.ConnectionLimiterService; final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl implements CachedLimiter { @@ -49,6 +53,8 @@ final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl imp private final Map<ConnectionLimitProvider<?>, ConnectionLimiter> _connectionLimitProviders = new ConcurrentHashMap<>(); + private final List<ConnectionLimiterService> _serviceLimiters = new CopyOnWriteArrayList<>(); + private final ChangeListener _virtualHostChangeListener; private final ChangeListener _brokerChangeListener; @@ -70,6 +76,13 @@ final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl imp .forEach(child -> child.addChangeListener(ProviderChangeListener.virtualHostChangeListener(this))); _broker.getChildren(BrokerConnectionLimitProvider.class) .forEach(child -> child.addChangeListener(ProviderChangeListener.brokerChangeListener(this))); + + final QpidServiceLoader serviceLoader = new QpidServiceLoader(); + for (final ConnectionLimiterService service : serviceLoader.instancesOf(ConnectionLimiterService.class)) + { + LOGGER.debug("New connection limiter service found: {}", service.getType()); + _serviceLimiters.add(service); + } } public void activate() @@ -90,6 +103,8 @@ final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl imp _broker.getChildren(BrokerConnectionLimitProvider.class) .forEach(child -> child.removeChangeListener(brokerChangeListener)); + _serviceLimiters.clear(); + swapLimiter(ConnectionLimiter.noLimits()); } @@ -140,6 +155,12 @@ final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl imp limiter = ConnectionLimiter.blockEveryone(); } } + + LOGGER.debug("Updating service based connection limiters"); + for (final ConnectionLimiterService service : _serviceLimiters) + { + limiter = limiter.append(service); + } return limiter; } diff --git a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java index a5f2e21..350fda5 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java @@ -78,7 +78,6 @@ import org.apache.qpid.server.store.preferences.PreferenceStore; import org.apache.qpid.server.store.preferences.PreferenceStoreUpdater; import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy; -import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy; import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; @@ -338,7 +337,7 @@ public class VirtualHostTest extends UnitTestBase virtualHost.getConnectionCount()); AMQPConnection modelConnection = getMockConnection(); - virtualHost.registerConnection(modelConnection, new NoopConnectionEstablishmentPolicy()); + virtualHost.registerConnection(modelConnection); assertEquals("Unexpected number of connections after connection registered", (long) 1, @@ -367,7 +366,7 @@ public class VirtualHostTest extends UnitTestBase virtualHost.getConnectionCount()); AMQPConnection modelConnection = getMockConnection(); - virtualHost.registerConnection(modelConnection, new NoopConnectionEstablishmentPolicy()); + virtualHost.registerConnection(modelConnection); assertEquals("Unexpected number of connections after connection registered", (long) 1, @@ -499,7 +498,7 @@ public class VirtualHostTest extends UnitTestBase { VirtualHost<?> host = createVirtualHost(getTestName()); AMQPConnection connection = getMockConnection(); - host.registerConnection(connection, new NoopConnectionEstablishmentPolicy()); + host.registerConnection(connection); ((EventListener) host).event(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL); verify(connection).block(); } @@ -578,7 +577,7 @@ public class VirtualHostTest extends UnitTestBase AMQPConnection<?> connection = getMockConnection(); assertEquals("unexpected number of connections before test", (long) 0, vhost.getConnectionCount()); - vhost.registerConnection(connection, new NoopConnectionEstablishmentPolicy()); + vhost.registerConnection(connection); assertEquals("unexpected number of connections after registerConnection", (long) 1, vhost.getConnectionCount()); @@ -591,7 +590,7 @@ public class VirtualHostTest extends UnitTestBase QueueManagingVirtualHost<?> vhost = createVirtualHost("sdf"); AMQPConnection<?> connection = getMockConnection(); - vhost.registerConnection(connection, new NoopConnectionEstablishmentPolicy()); + vhost.registerConnection(connection); assertEquals("unexpected number of connections after registerConnection", (long) 1, vhost.getConnectionCount()); @@ -610,7 +609,7 @@ public class VirtualHostTest extends UnitTestBase ((AbstractConfiguredObject<?>)vhost).stop(); try { - vhost.registerConnection(connection, new NoopConnectionEstablishmentPolicy()); + vhost.registerConnection(connection); fail("exception not thrown"); } catch (VirtualHostUnavailableException e) @@ -619,7 +618,7 @@ public class VirtualHostTest extends UnitTestBase } assertEquals("unexpected number of connections", (long) 0, vhost.getConnectionCount()); ((AbstractConfiguredObject<?>)vhost).start(); - vhost.registerConnection(connection, new NoopConnectionEstablishmentPolicy()); + vhost.registerConnection(connection); assertEquals("unexpected number of connections", (long) 1, vhost.getConnectionCount()); } diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index 6cc8fe8..a2d3bbd 100644 --- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -73,7 +73,6 @@ import org.apache.qpid.server.transport.ServerNetworkConnection; import org.apache.qpid.server.transport.network.NetworkConnection; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy; public class ServerConnection extends ConnectionInvoker { @@ -194,7 +193,7 @@ public class ServerConnection extends ConnectionInvoker public void setVirtualHost(NamedAddressSpace addressSpace) { - addressSpace.registerConnection(_amqpConnection, new NoopConnectionEstablishmentPolicy()); + addressSpace.registerConnection(_amqpConnection); _amqpConnection.setAddressSpace(addressSpace); } diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java index e30b09e..bc04386 100644 --- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java +++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java @@ -79,7 +79,6 @@ import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy; import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; public class AMQPConnection_0_8Impl @@ -984,7 +983,7 @@ public class AMQPConnection_0_8Impl try { - addressSpace.registerConnection(this, new NoopConnectionEstablishmentPolicy()); + addressSpace.registerConnection(this); setAddressSpace(addressSpace); if (addressSpace.authoriseCreateConnection(this)) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java index 10a907d..ef5057e 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry; import org.apache.qpid.server.protocol.v1_0.type.FrameBody; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy; import org.apache.qpid.server.protocol.v1_0.type.transport.End; import org.apache.qpid.server.protocol.v1_0.type.transport.Error; import org.apache.qpid.server.transport.AMQPConnection; @@ -88,4 +89,6 @@ public interface AMQPConnection_1_0<C extends AMQPConnection_1_0<C>> extends AMQ @DerivedAttribute(description = "If true send a final SASL challenge using a SaslChallenge performative, rather than SaslOutcome.") boolean getSendSaslFinalChallengeAsChallenge(); + + SoleConnectionEnforcementPolicy getSoleConnectionEnforcementPolicy(); } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index 8f8e351..f2d7262 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -48,8 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import com.google.common.collect.Iterators; import com.google.common.collect.PeekingIterator; @@ -92,6 +90,7 @@ import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties; import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy; import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy; +import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicyException; import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge; import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode; import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; @@ -119,7 +118,6 @@ import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManag import org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl; import org.apache.qpid.server.security.auth.sasl.SaslNegotiator; import org.apache.qpid.server.session.AMQPSession; -import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.transport.AbstractAMQPConnection; import org.apache.qpid.server.transport.AggregateTicker; import org.apache.qpid.server.transport.ByteBufferSender; @@ -926,86 +924,28 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio try { - final boolean registerSucceeded = addressSpace.registerConnection(this, (existingConnections, newConnection) -> - { - boolean proceedWithRegistration = true; - if (newConnection instanceof AMQPConnection_1_0Impl && !newConnection.isClosing()) - { - final List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>(); - for (AMQPConnection<?> existingConnection : StreamSupport.stream(existingConnections.spliterator(), false) - .filter(con -> con instanceof AMQPConnection_1_0) - .filter(con -> !con.isClosing()) - .filter(con -> con.getRemoteContainerName().equals(newConnection.getRemoteContainerName())) - .collect(Collectors.toList())) - { - SoleConnectionEnforcementPolicy soleConnectionEnforcementPolicy = null; - if (((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy - != null) - { - soleConnectionEnforcementPolicy = - ((AMQPConnection_1_0Impl) existingConnection)._soleConnectionEnforcementPolicy; - } - else if (((AMQPConnection_1_0Impl) newConnection)._soleConnectionEnforcementPolicy != null) - { - soleConnectionEnforcementPolicy = - ((AMQPConnection_1_0Impl) newConnection)._soleConnectionEnforcementPolicy; - } - if (SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.equals(soleConnectionEnforcementPolicy)) - { - _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true); - final Error error = new Error(AmqpError.INVALID_FIELD, - String.format( - "Connection closed due to sole-connection-enforcement-policy '%s'", - String.valueOf(soleConnectionEnforcementPolicy))); - error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id"))); - newConnection.doOnIOThreadAsync(() -> ((AMQPConnection_1_0Impl) newConnection).closeConnection(error)); - proceedWithRegistration = false; - break; - } - else if (SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy)) - { - final Error error = new Error(AmqpError.RESOURCE_LOCKED, - String.format( - "Connection closed due to sole-connection-enforcement-policy '%s'", - String.valueOf(soleConnectionEnforcementPolicy))); - error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)); - rescheduleFutures.add(existingConnection.doOnIOThreadAsync( - () -> ((AMQPConnection_1_0Impl) existingConnection).closeConnection(error))); - proceedWithRegistration = false; - } - } - if (!rescheduleFutures.isEmpty()) - { - doAfter(allAsList(rescheduleFutures), () -> newConnection.doOnIOThreadAsync(() -> receiveOpenInternal(addressSpace))); - } - } - return proceedWithRegistration; - }); + addressSpace.registerConnection(this); + setAddressSpace(addressSpace); - if (registerSucceeded) + if (!addressSpace.authoriseCreateConnection(this)) { - setAddressSpace(addressSpace); - - if (!addressSpace.authoriseCreateConnection(this)) + closeConnection(AmqpError.NOT_ALLOWED, "Connection refused"); + } + else + { + switch (_connectionState) { - closeConnection(AmqpError.NOT_ALLOWED, "Connection refused"); - } - else - { - switch (_connectionState) - { - case AWAIT_OPEN: - sendOpen(_channelMax, _maxFrameSize); - _connectionState = ConnectionState.OPENED; - break; - case CLOSE_SENT: - case CLOSED: - // already sent our close - probably due to an error - break; - default: - throw new ConnectionScopedRuntimeException(String.format( - "Unexpected state %s during connection open.", _connectionState)); - } + case AWAIT_OPEN: + sendOpen(_channelMax, _maxFrameSize); + _connectionState = ConnectionState.OPENED; + break; + case CLOSE_SENT: + case CLOSED: + // already sent our close - probably due to an error + break; + default: + throw new ConnectionScopedRuntimeException(String.format( + "Unexpected state %s during connection open.", _connectionState)); } } } @@ -1013,6 +953,10 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio { closeConnection(AmqpError.NOT_ALLOWED, e.getMessage()); } + catch (SoleConnectionEnforcementPolicyException e) + { + handleSoleConnectionEnforcement(addressSpace, e); + } catch (ConnectionLimitException e) { LOGGER.debug("User connection limit exceeded", e); @@ -1020,6 +964,39 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } } + private void handleSoleConnectionEnforcement(final NamedAddressSpace addressSpace, + final SoleConnectionEnforcementPolicyException e) + { + if (isClosing()) + { + return; + } + if (e.getPolicy() == SoleConnectionEnforcementPolicy.REFUSE_CONNECTION) + { + _properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true); + final Error error = new Error(AmqpError.INVALID_FIELD, + String.format("Connection closed due to sole-connection-enforcement-policy '%s'", e.getPolicy())); + error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), Symbol.valueOf("container-id"))); + closeConnection(error); + } + else if (e.getPolicy() == SoleConnectionEnforcementPolicy.CLOSE_EXISTING) + { + final Error error = new Error(AmqpError.RESOURCE_LOCKED, + String.format("Connection closed due to sole-connection-enforcement-policy '%s'", e.getPolicy())); + error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"), true)); + + final List<ListenableFuture<Void>> rescheduleFutures = new ArrayList<>(); + for (final AMQPConnection_1_0<?> connection : e.getExistingConnections()) + { + if (!connection.isClosing()) + { + rescheduleFutures.add(connection.doOnIOThreadAsync(() -> connection.close(error))); + } + } + doAfter(allAsList(rescheduleFutures), () -> doOnIOThreadAsync(() -> receiveOpenInternal(addressSpace))); + } + } + private void populateConnectionRedirect(final NamedAddressSpace addressSpace, final Error err) { final String redirectHost = addressSpace.getRedirectHost(getPort()); @@ -1885,6 +1862,12 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } @Override + public SoleConnectionEnforcementPolicy getSoleConnectionEnforcementPolicy() + { + return _soleConnectionEnforcementPolicy; + } + + @Override protected boolean isOpeningInProgress() { switch (_connectionState) diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java index e3a3ef5..bd66dd0 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java @@ -15,26 +15,27 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ - package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; +import java.util.Locale; + import org.apache.qpid.server.protocol.v1_0.type.RestrictedType; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; -public class SoleConnectionDetectionPolicy implements RestrictedType<UnsignedInteger> +public enum SoleConnectionDetectionPolicy implements RestrictedType<UnsignedInteger> { - public static final SoleConnectionDetectionPolicy - STRONG = new SoleConnectionDetectionPolicy(UnsignedInteger.valueOf(0)); - public static final SoleConnectionDetectionPolicy - WEAK = new SoleConnectionDetectionPolicy(UnsignedInteger.valueOf(1)); + STRONG(0), + WEAK(1); private final UnsignedInteger _val; - private SoleConnectionDetectionPolicy(final UnsignedInteger val) + private final String _description; + + SoleConnectionDetectionPolicy(int val) { - _val = val; + _val = UnsignedInteger.valueOf(val); + _description = name().toLowerCase(Locale.ENGLISH); } @Override @@ -45,39 +46,21 @@ public class SoleConnectionDetectionPolicy implements RestrictedType<UnsignedInt public static SoleConnectionDetectionPolicy valueOf(Object obj) { - if (obj instanceof UnsignedInteger) + for (final SoleConnectionDetectionPolicy detectionPolicy : values()) { - UnsignedInteger val = (UnsignedInteger) obj; - - if (STRONG._val.equals(val)) - { - return STRONG; - } - - if (WEAK._val.equals(val)) + if (detectionPolicy._val.equals(obj)) { - return WEAK; + return detectionPolicy; } } - final String message = String.format("Cannot convert '%s' into 'sole-connection-detection-policy'", obj); - throw new IllegalArgumentException(message); + throw new IllegalArgumentException( + String.format("Cannot convert '%s' into 'sole-connection-detection-policy'", obj)); } @Override public String toString() { - - if (this == STRONG) - { - return "strong"; - } - - if (this == WEAK) - { - return "weak"; - } - - return String.valueOf(_val); + return _description; } } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java index 86a57b1..2655d36 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java @@ -15,26 +15,26 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ - package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; +import java.util.Objects; + import org.apache.qpid.server.protocol.v1_0.type.RestrictedType; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; -public class SoleConnectionEnforcementPolicy implements RestrictedType<UnsignedInteger> +public enum SoleConnectionEnforcementPolicy implements RestrictedType<UnsignedInteger> { - public static final SoleConnectionEnforcementPolicy - REFUSE_CONNECTION = new SoleConnectionEnforcementPolicy(UnsignedInteger.valueOf(0)); - public static final SoleConnectionEnforcementPolicy - CLOSE_EXISTING = new SoleConnectionEnforcementPolicy(UnsignedInteger.valueOf(1)); + REFUSE_CONNECTION(0, "refuse-connection"), + CLOSE_EXISTING(1, "close-existing"); private final UnsignedInteger _val; + private final String _description; - private SoleConnectionEnforcementPolicy(final UnsignedInteger val) + SoleConnectionEnforcementPolicy(int val, String description) { - _val = val; + _val = UnsignedInteger.valueOf(val); + _description = Objects.requireNonNull(description); } @Override @@ -45,39 +45,21 @@ public class SoleConnectionEnforcementPolicy implements RestrictedType<UnsignedI public static SoleConnectionEnforcementPolicy valueOf(Object obj) { - if (obj instanceof UnsignedInteger) + for (final SoleConnectionEnforcementPolicy policy : values()) { - UnsignedInteger val = (UnsignedInteger) obj; - - if (REFUSE_CONNECTION._val.equals(val)) - { - return REFUSE_CONNECTION; - } - - if (CLOSE_EXISTING._val.equals(val)) + if (policy._val.equals(obj)) { - return CLOSE_EXISTING; + return policy; } } - final String message = String.format("Cannot convert '%s' into 'sole-connection-enforcement-policy'", obj); - throw new IllegalArgumentException(message); + throw new IllegalArgumentException( + String.format("Cannot convert '%s' into 'sole-connection-enforcement-policy'", obj)); } @Override public String toString() { - - if (this == REFUSE_CONNECTION) - { - return "refuse-connection"; - } - - if (this == CLOSE_EXISTING) - { - return "close-existing"; - } - - return String.valueOf(_val); + return _description; } } diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java new file mode 100644 index 0000000..8f49b7b --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0; +import org.apache.qpid.server.security.limit.ConnectionLimitException; + +public class SoleConnectionEnforcementPolicyException extends ConnectionLimitException +{ + private final Set<AMQPConnection_1_0<?>> _existingConnections; + + private final SoleConnectionEnforcementPolicy _policy; + + public SoleConnectionEnforcementPolicyException(SoleConnectionEnforcementPolicy policy, + Collection<? extends AMQPConnection_1_0<?>> connections) + { + super(String.format("Single connection is required due to sole-connection-enforcement-policy '%s'", policy)); + _policy = policy; + _existingConnections = new HashSet<>(connections); + } + + public SoleConnectionEnforcementPolicy getPolicy() + { + return _policy; + } + + public Set<AMQPConnection_1_0<?>> getExistingConnections() + { + return Collections.unmodifiableSet(_existingConnections); + } +} diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java new file mode 100644 index 0000000..c0ff83b --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.server.plugin.PluggableService; +import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0; +import org.apache.qpid.server.security.limit.ConnectionLimiter; +import org.apache.qpid.server.security.limit.ConnectionLimiterService; +import org.apache.qpid.server.security.limit.ConnectionSlot; +import org.apache.qpid.server.transport.AMQPConnection; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@PluggableService +public class StrongConnectionEstablishmentLimiter implements ConnectionLimiterService +{ + private static final Logger LOGGER = LoggerFactory.getLogger(StrongConnectionEstablishmentLimiter.class); + + private final Map<String, UsageCounter> _slots; + + private final ConnectionLimiter _underlyingLimiter; + + public StrongConnectionEstablishmentLimiter() + { + super(); + _slots = new ConcurrentHashMap<>(); + _underlyingLimiter = ConnectionLimiter.noLimits(); + } + + private StrongConnectionEstablishmentLimiter(StrongConnectionEstablishmentLimiter limiter, ConnectionLimiter underlyingLimiter) + { + super(); + _slots = limiter._slots; + _underlyingLimiter = Objects.requireNonNull(underlyingLimiter); + } + + @Override + public String getType() + { + return "EstablishmentPolicy." + SoleConnectionDetectionPolicy.STRONG; + } + + @Override + public ConnectionSlot register(AMQPConnection<?> connection) + { + if (!(connection instanceof AMQPConnection_1_0) || connection.isClosing()) + { + return _underlyingLimiter.register(connection); + } + LOGGER.debug("Registering a new connection '{}'", connection); + final AMQPConnection_1_0<?> newConnection = (AMQPConnection_1_0<?>) connection; + final String remoteContainerId = newConnection.getRemoteContainerId(); + if (remoteContainerId == null) + { + // 'container-id' is the mandatory field of open frame but could be null in integration or JUnit tests, + // e.g. when AMQPConnection_1_0 is mocked in a test. + LOGGER.warn( + "The connection '{}' without container ID, 'container-id' is the mandatory field of open frame", + connection); + return _underlyingLimiter.register(connection); + } + + LOGGER.debug("Checking a container slot for the connection '{}'", connection); + try + { + return _slots.compute(remoteContainerId, + (containerId, counter) -> counter == null ? newUsageCounter(containerId) : counter.addUser()) + .registerConnection(newConnection); + } + catch (RuntimeException e) + { + LOGGER.debug("Registering connection failed", e); + deregisterUser(remoteContainerId); + throw e; + } + } + + private void deregisterUser(final String containerId) + { + _slots.computeIfPresent(containerId, (id, slot) -> slot.removeUser()); + } + + private UsageCounter newUsageCounter(String containerId) + { + return new UsageCounter(new RemoteContainerSlot(containerId), 1L); + } + + @Override + public ConnectionLimiter append(ConnectionLimiter limiter) + { + return new StrongConnectionEstablishmentLimiter(this, _underlyingLimiter.append(limiter)); + } + + private static final class UsageCounter + { + private final long _counter; + + private final RemoteContainerSlot _slot; + + UsageCounter(RemoteContainerSlot slot, long counter) + { + _counter = counter; + _slot = Objects.requireNonNull(slot); + } + + public ConnectionSlot registerConnection(final AMQPConnection_1_0<?> connection) + { + return _slot.register(connection); + } + + public UsageCounter addUser() + { + return new UsageCounter(_slot, _counter + 1L); + } + + public UsageCounter removeUser() + { + return _counter <= 1 ? null : new UsageCounter(_slot, _counter - 1L); + } + } + + private final class RemoteContainerSlot + { + private final String _containerId; + + private final Set<AMQPConnection_1_0<?>> _connections; + + RemoteContainerSlot(String containerId) + { + super(); + _connections = new HashSet<>(); + _containerId = Objects.requireNonNull(containerId); + } + + private synchronized ConnectionSlot register(final AMQPConnection_1_0<?> connection) + { + final SoleConnectionEnforcementPolicy soleConnectionPolicy = extractPolicy(connection); + + if (soleConnectionPolicy != null && !_connections.isEmpty()) + { + LOGGER.debug("Single connection is required, sole connection policy: {}", soleConnectionPolicy); + throw new SoleConnectionEnforcementPolicyException(soleConnectionPolicy, _connections); + } + + final ConnectionSlot underlyingSlot = _underlyingLimiter.register(connection); + _connections.add(connection); + final ConnectionSlot slot = () -> + { + try + { + remove(connection); + } + finally + { + deregisterUser(_containerId); + } + }; + return slot.chainTo(underlyingSlot); + } + + private SoleConnectionEnforcementPolicy extractPolicy(AMQPConnection_1_0<?> connection) + { + if (_connections.isEmpty()) + { + return connection.getSoleConnectionEnforcementPolicy(); + } + SoleConnectionEnforcementPolicy soleConnectionPolicy = null; + + final Iterator<AMQPConnection_1_0<?>> iterator = _connections.iterator(); + while (iterator.hasNext()) + { + final AMQPConnection_1_0<?> existingConnection = iterator.next(); + if (existingConnection.isClosing()) + { + iterator.remove(); + } + else + { + soleConnectionPolicy = existingConnection.getSoleConnectionEnforcementPolicy(); + } + } + if (soleConnectionPolicy == null) + { + return connection.getSoleConnectionEnforcementPolicy(); + } + return soleConnectionPolicy; + } + + private synchronized void remove(final AMQPConnection_1_0<?> connection) + { + _connections.remove(connection); + } + } +} diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java index 3f1f10f..962fabb 100644 --- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java @@ -64,6 +64,7 @@ import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame; import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicyException; import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; import org.apache.qpid.server.protocol.v1_0.type.transport.Open; import org.apache.qpid.server.security.SubjectCreator; @@ -75,7 +76,6 @@ import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.transport.AggregateTicker; import org.apache.qpid.server.transport.ByteBufferSender; import org.apache.qpid.server.transport.ServerNetworkConnection; -import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy; import org.apache.qpid.server.virtualhost.VirtualHostPrincipal; import org.apache.qpid.test.utils.UnitTestBase; @@ -121,16 +121,15 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase when(_virtualHost.isActive()).thenReturn(true); final ArgumentCaptor<AMQPConnection> connectionCaptor = ArgumentCaptor.forClass(AMQPConnection.class); - final ArgumentCaptor<ConnectionEstablishmentPolicy> establishmentPolicyCaptor = ArgumentCaptor.forClass(ConnectionEstablishmentPolicy.class); doAnswer(new Answer() { @Override public Object answer(final InvocationOnMock invocation) throws Throwable { _connection = connectionCaptor.getValue(); - return null; + throw new SoleConnectionEnforcementPolicyException(null, Collections.emptySet()); } - }).when(_virtualHost).registerConnection(connectionCaptor.capture(), establishmentPolicyCaptor.capture()); + }).when(_virtualHost).registerConnection(connectionCaptor.capture()); when(_virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class)); when(_port.getAddressSpace(anyString())).thenReturn(_virtualHost); when(_port.getSubjectCreator(anyBoolean(), anyString())).thenReturn(subjectCreator); @@ -202,7 +201,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase open.setContainerId("testContainerId"); _frameWriter.send(new TransportFrame((int) (short) 0, open)); - verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class)); + verify(_virtualHost).registerConnection(any(AMQPConnection.class)); AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal(); assertNotNull(principal); assertEquals(principal, @@ -224,7 +223,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase open.setContainerId("testContainerId"); _frameWriter.send(new TransportFrame((int) (short) 0, open)); - verify(_virtualHost, never()).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class)); + verify(_virtualHost, never()).registerConnection(any(AMQPConnection.class)); verify(_networkConnection).close(); } @@ -244,7 +243,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase open.setContainerId("testContainerId"); _frameWriter.send(new TransportFrame((int) (short) 0, open)); - verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class)); + verify(_virtualHost).registerConnection(any(AMQPConnection.class)); AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal(); assertNotNull(authPrincipal); assertEquals(authPrincipal, new AuthenticatedPrincipal(principal)); @@ -277,7 +276,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase open.setContainerId("testContainerId"); _frameWriter.send(new TransportFrame((int) (short) 0, open)); - verify(_virtualHost).registerConnection(any(AMQPConnection.class), any(ConnectionEstablishmentPolicy.class)); + verify(_virtualHost).registerConnection(any(AMQPConnection.class)); AuthenticatedPrincipal principal = (AuthenticatedPrincipal) _connection.getAuthorizedPrincipal(); assertNotNull(principal); assertEquals(principal, diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java new file mode 100644 index 0000000..aae4e93 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; + +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.test.utils.UnitTestBase; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class SoleConnectionDetectionPolicyTest extends UnitTestBase +{ + @Test + public void testValue() + { + assertEquals(new UnsignedInteger(0), SoleConnectionDetectionPolicy.STRONG.getValue()); + assertEquals(new UnsignedInteger(1), SoleConnectionDetectionPolicy.WEAK.getValue()); + } + + @Test + public void testValueOf() + { + assertEquals(SoleConnectionDetectionPolicy.STRONG, SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(0))); + assertEquals(SoleConnectionDetectionPolicy.WEAK, SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(1))); + + try + { + SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(2)); + fail("An exception is expected"); + } + catch (RuntimeException e) + { + assertNotNull(e.getMessage()); + } + } + + @Test + public void testToString() + { + assertEquals("strong", SoleConnectionDetectionPolicy.STRONG.toString()); + assertEquals("weak", SoleConnectionDetectionPolicy.WEAK.toString()); + } +} diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java new file mode 100644 index 0000000..1e10dcd --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; + +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.test.utils.UnitTestBase; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class SoleConnectionEnforcementPolicyTest extends UnitTestBase +{ + @Test + public void testValue() + { + assertEquals(new UnsignedInteger(0), SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.getValue()); + assertEquals(new UnsignedInteger(1), SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue()); + } + + @Test + public void testValueOf() + { + assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION, SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(0))); + assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING, SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(1))); + + try + { + SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(2)); + fail("An exception is expected"); + } + catch (RuntimeException e) + { + assertNotNull(e.getMessage()); + } + } + + @Test + public void testToString() + { + assertEquals("refuse-connection", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.toString()); + assertEquals("close-existing", SoleConnectionEnforcementPolicy.CLOSE_EXISTING.toString()); + } +} diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java new file mode 100644 index 0000000..fdbd0b4 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0; +import org.apache.qpid.server.security.limit.ConnectionLimiter; +import org.apache.qpid.server.security.limit.ConnectionSlot; +import org.apache.qpid.server.transport.AMQPConnection; +import org.apache.qpid.test.utils.UnitTestBase; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class StrongConnectionEstablishmentLimiterTest extends UnitTestBase +{ + private StrongConnectionEstablishmentLimiter _limiter; + + private Registry _registry; + + @Before + public void setUp() + { + _registry = new Registry(); + _limiter = (StrongConnectionEstablishmentLimiter) new StrongConnectionEstablishmentLimiter().append(_registry); + } + + @Test + public void testType() + { + assertEquals("EstablishmentPolicy.strong", _limiter.getType()); + } + + @Test + public void testNoPolicy() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C", null); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + final AMQPConnection_1_0<?> connection2 = newConnection("C", null); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + final AMQPConnection_1_0<?> connection3 = newConnection("C", null); + final ConnectionSlot slot3 = _limiter.register(connection3); + assertTrue(_registry.isRegistered(connection3)); + + slot3.free(); + assertFalse(_registry.isRegistered(connection3)); + assertTrue(_registry.hasBeenRegistered(connection3)); + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + } + + @Test + public void testNewConnectionWithPolicy() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C", null); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + final AMQPConnection_1_0<?> connection2 = newConnection("C", null); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + final AMQPConnection_1_0<?> connection3 = newConnection("C", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION); + try + { + _limiter.register(connection3); + fail("A sole connection enforcement policy exception is expected"); + } + catch (SoleConnectionEnforcementPolicyException e) + { + assertEquals( + "Single connection is required due to sole-connection-enforcement-policy 'refuse-connection'", + e.getMessage()); + + assertEquals(2, e.getExistingConnections().size()); + assertTrue(e.getExistingConnections().contains(connection1)); + assertTrue(e.getExistingConnections().contains(connection2)); + assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION, e.getPolicy()); + } + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + } + + @Test + public void testExistingConnectionWithPolicy() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C", SoleConnectionEnforcementPolicy.CLOSE_EXISTING); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + final AMQPConnection_1_0<?> connection2 = newConnection("C", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION); + try + { + _limiter.register(connection2); + fail("A sole connection enforcement policy exception is expected"); + } + catch (SoleConnectionEnforcementPolicyException e) + { + assertEquals( + "Single connection is required due to sole-connection-enforcement-policy 'close-existing'", + e.getMessage()); + + assertEquals(1, e.getExistingConnections().size()); + assertTrue(e.getExistingConnections().contains(connection1)); + assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING, e.getPolicy()); + } + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + } + + @Test + public void testExistingClosedConnectionWithPolicy() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C", SoleConnectionEnforcementPolicy.CLOSE_EXISTING); + Mockito.doReturn(false).when(connection1).isClosing(); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + Mockito.doReturn(true).when(connection1).isClosing(); + final AMQPConnection_1_0<?> connection2 = newConnection("C", SoleConnectionEnforcementPolicy.CLOSE_EXISTING); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + } + + @Test + public void testClosedConnection() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + final AMQPConnection_1_0<?> connection2 = newConnection("C", SoleConnectionEnforcementPolicy.CLOSE_EXISTING); + Mockito.doReturn(true).when(connection1).isClosing(); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + } + + @Test + public void testNewConnectionWithPolicy_ClosedExisting() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION); + Mockito.doReturn(false).when(connection1).isClosing(); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + Mockito.doReturn(true).when(connection1).isClosing(); + final AMQPConnection_1_0<?> connection2 = newConnection("C", null); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + final AMQPConnection_1_0<?> connection3 = newConnection("C", SoleConnectionEnforcementPolicy.CLOSE_EXISTING); + try + { + _limiter.register(connection3); + fail("A sole connection enforcement policy exception is expected"); + } + catch (SoleConnectionEnforcementPolicyException e) + { + assertEquals( + "Single connection is required due to sole-connection-enforcement-policy 'close-existing'", + e.getMessage()); + + assertEquals(1, e.getExistingConnections().size()); + assertTrue(e.getExistingConnections().contains(connection2)); + assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING, e.getPolicy()); + } + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + } + + @Test + public void testNewConnectionWithPolicy2_ClosedExisting() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION); + Mockito.doReturn(false).when(connection1).isClosing(); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + Mockito.doReturn(true).when(connection1).isClosing(); + final AMQPConnection_1_0<?> connection2 = newConnection("C", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + final AMQPConnection_1_0<?> connection3 = newConnection("C", SoleConnectionEnforcementPolicy.CLOSE_EXISTING); + try + { + _limiter.register(connection3); + fail("A sole connection enforcement policy exception is expected"); + } + catch (SoleConnectionEnforcementPolicyException e) + { + assertEquals( + "Single connection is required due to sole-connection-enforcement-policy 'refuse-connection'", + e.getMessage()); + + assertEquals(1, e.getExistingConnections().size()); + assertTrue(e.getExistingConnections().contains(connection2)); + assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION, e.getPolicy()); + } + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + } + + @Test + public void testAnotherConnectionType() + { + final AMQPConnection<?> connection = Mockito.mock(AMQPConnection.class); + final ConnectionSlot slot = _limiter.register(connection); + assertTrue(_registry.isRegistered(connection)); + slot.free(); + assertFalse(_registry.isRegistered(connection)); + assertTrue(_registry.hasBeenRegistered(connection)); + Mockito.verifyNoInteractions(connection); + } + + @Test + public void testMultipleIndependentConnections() + { + final AMQPConnection_1_0<?> connection1 = newConnection("C1", null); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + final AMQPConnection_1_0<?> connection2 = newConnection("C2", SoleConnectionEnforcementPolicy.REFUSE_CONNECTION); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + final AMQPConnection_1_0<?> connection3 = newConnection("C3", SoleConnectionEnforcementPolicy.CLOSE_EXISTING); + final ConnectionSlot slot3 = _limiter.register(connection3); + assertTrue(_registry.isRegistered(connection3)); + + slot3.free(); + assertFalse(_registry.isRegistered(connection3)); + assertTrue(_registry.hasBeenRegistered(connection3)); + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + } + + @Test + public void testMultipleIndependentConnections2() + { + final AMQPConnection_1_0<?> connection1 = newConnection(null, null); + final ConnectionSlot slot1 = _limiter.register(connection1); + assertTrue(_registry.isRegistered(connection1)); + + final AMQPConnection_1_0<?> connection2 = newConnection(null, null); + final ConnectionSlot slot2 = _limiter.register(connection2); + assertTrue(_registry.isRegistered(connection2)); + + final AMQPConnection_1_0<?> connection3 = newConnection(null, null); + final ConnectionSlot slot3 = _limiter.register(connection3); + assertTrue(_registry.isRegistered(connection3)); + + slot3.free(); + assertFalse(_registry.isRegistered(connection3)); + assertTrue(_registry.hasBeenRegistered(connection3)); + + slot2.free(); + assertFalse(_registry.isRegistered(connection2)); + assertTrue(_registry.hasBeenRegistered(connection2)); + + slot1.free(); + assertFalse(_registry.isRegistered(connection1)); + assertTrue(_registry.hasBeenRegistered(connection1)); + } + + private AMQPConnection_1_0<?> newConnection(String id, SoleConnectionEnforcementPolicy policy) + { + final AMQPConnection_1_0<?> connection = Mockito.mock(AMQPConnection_1_0.class); + Mockito.doReturn(id).when(connection).getRemoteContainerId(); + Mockito.doReturn(policy).when(connection).getSoleConnectionEnforcementPolicy(); + return connection; + } + + static final class Registry implements ConnectionLimiter + { + private final Set<AMQPConnection<?>> _registered; + + private final Set<AMQPConnection<?>> _connections; + + private final ConnectionLimiter _subLimiter; + + public Registry() + { + _registered = new HashSet<>(); + _connections = new HashSet<>(); + _subLimiter = ConnectionLimiter.noLimits(); + } + + private Registry(Registry limiter, ConnectionLimiter subLimiter) + { + _registered = limiter._registered; + _connections = limiter._connections; + _subLimiter = Objects.requireNonNull(subLimiter); + } + + @Override + public ConnectionSlot register(final AMQPConnection<?> connection) + { + final ConnectionSlot slot = _subLimiter.register(connection); + _registered.add(connection); + _connections.add(connection); + return slot.chainTo(() -> _connections.remove(connection)); + } + + @Override + public ConnectionLimiter append(ConnectionLimiter limiter) + { + return new Registry(this, _subLimiter.append(limiter)); + } + + public boolean isRegistered(AMQPConnection<?> connection) + { + return _connections.contains(connection); + } + + public boolean hasBeenRegistered(AMQPConnection<?> connection) + { + return _registered.contains(connection); + } + } +} diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java index 0e83704..d3fd813 100644 --- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java +++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java @@ -68,7 +68,6 @@ import org.apache.qpid.server.transport.AMQPConnection; import org.apache.qpid.server.txn.DtxNotSupportedException; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.util.Action; -import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy; import org.apache.qpid.server.virtualhost.LinkRegistryFactory; import org.apache.qpid.server.virtualhost.LinkRegistryModel; import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode; @@ -195,11 +194,9 @@ public class ManagementAddressSpace implements NamedAddressSpace } @Override - public boolean registerConnection(final AMQPConnection<?> connection, - final ConnectionEstablishmentPolicy connectionEstablishmentPolicy) + public void registerConnection(final AMQPConnection<?> connection) { _connections.add(connection); - return true; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org