This is an automated email from the ASF dual-hosted git repository.

vavrtom pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/main by this push:
     new 28cbf3b  QPID-8558: [Broker-J] Enhancement of sole connection 
enforcement policy evaluation (#106)
28cbf3b is described below

commit 28cbf3b2ba40e3a895c723b3af84b07810cf43c9
Author: Marek Laca <mkl...@users.noreply.github.com>
AuthorDate: Mon Nov 15 09:18:01 2021 +0100

    QPID-8558: [Broker-J] Enhancement of sole connection enforcement policy 
evaluation (#106)
---
 .../qpid/server/model/NamedAddressSpace.java       |   5 +-
 .../limit/ConnectionLimiterService.java}           |   9 +-
 .../AbstractNonConnectionAcceptingVirtualHost.java |   4 +-
 .../server/virtualhost/AbstractVirtualHost.java    | 110 +-----
 .../NoopConnectionEstablishmentPolicy.java         |  33 --
 .../virtualhost/VirtualHostConnectionLimiter.java  |  21 ++
 .../apache/qpid/server/model/VirtualHostTest.java  |  15 +-
 .../server/protocol/v0_10/ServerConnection.java    |   3 +-
 .../protocol/v0_8/AMQPConnection_0_8Impl.java      |   3 +-
 .../server/protocol/v1_0/AMQPConnection_1_0.java   |   3 +
 .../protocol/v1_0/AMQPConnection_1_0Impl.java      | 143 ++++----
 .../soleconn/SoleConnectionDetectionPolicy.java    |  49 +--
 .../soleconn/SoleConnectionEnforcementPolicy.java  |  48 +--
 .../SoleConnectionEnforcementPolicyException.java  |  52 +++
 .../StrongConnectionEstablishmentLimiter.java      | 218 +++++++++++
 .../protocol/v1_0/ProtocolEngine_1_0_0Test.java    |  15 +-
 .../SoleConnectionDetectionPolicyTest.java         |  62 ++++
 .../SoleConnectionEnforcementPolicyTest.java       |  60 ++++
 .../StrongConnectionEstablishmentLimiterTest.java  | 398 +++++++++++++++++++++
 .../management/amqp/ManagementAddressSpace.java    |   5 +-
 20 files changed, 945 insertions(+), 311 deletions(-)

diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java 
b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
index 2792a31..09341c5 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/model/NamedAddressSpace.java
@@ -34,7 +34,6 @@ import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
 import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 
 public interface NamedAddressSpace extends Named
@@ -46,11 +45,9 @@ public interface NamedAddressSpace extends Named
 
     MessageDestination getAttainedMessageDestination(String name, boolean 
mayCreate);
 
-    boolean registerConnection(AMQPConnection<?> connection,
-                               final ConnectionEstablishmentPolicy 
connectionEstablishmentPolicy);
+    void registerConnection(AMQPConnection<?> connection);
     void deregisterConnection(AMQPConnection<?> connection);
 
-
     String getRedirectHost(AmqpPort<?> port);
 
     Principal getPrincipal();
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java
 
b/broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java
similarity index 74%
rename from 
broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java
rename to 
broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java
index 004e3db..d526589 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ConnectionEstablishmentPolicy.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/security/limit/ConnectionLimiterService.java
@@ -15,14 +15,11 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
+package org.apache.qpid.server.security.limit;
 
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.server.plugin.Pluggable;
 
-public interface ConnectionEstablishmentPolicy
+public interface ConnectionLimiterService extends ConnectionLimiter, Pluggable
 {
-    boolean mayEstablishNewConnection(Iterable<AMQPConnection<?>> 
existingConnections, AMQPConnection<?> newConnection);
 }
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
index 6d75ad1..b32dd49 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractNonConnectionAcceptingVirtualHost.java
@@ -72,11 +72,9 @@ public abstract class 
AbstractNonConnectionAcceptingVirtualHost<X extends Abstra
     }
 
     @Override
-    public boolean registerConnection(final AMQPConnection<?> connection,
-                                      final ConnectionEstablishmentPolicy 
connectionEstablishmentPolicy)
+    public void registerConnection(final AMQPConnection<?> connection)
     {
         throwUnsupported();
-        return false;
     }
 
     @Override
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 09c4fc1..cd1c26c 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -2646,70 +2646,22 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
     }
 
     @Override
-    public boolean registerConnection(final AMQPConnection<?> connection,
-                                      final ConnectionEstablishmentPolicy 
connectionEstablishmentPolicy)
+    public void registerConnection(final AMQPConnection<?> connection)
     {
+        if (!_acceptsConnections.get())
+        {
+            throw new VirtualHostUnavailableException(String.format(
+                    "VirtualHost '%s' not accepting connections",
+                    getName()));
+        }
         _connectionLimiter.register(connection);
-        return doSync(registerConnectionAsync(connection, 
connectionEstablishmentPolicy));
-    }
-
-    public ListenableFuture<Boolean> registerConnectionAsync(final 
AMQPConnection<?> connection,
-                                                          final 
ConnectionEstablishmentPolicy connectionEstablishmentPolicy)
-    {
-        return doOnConfigThread(new Task<ListenableFuture<Boolean>, 
RuntimeException>()
+        _connections.add(connection);
+        _totalConnectionCount.incrementAndGet();
+        if (_blocked.get())
         {
-            @Override
-            public ListenableFuture<Boolean> execute()
-            {
-                if (_acceptsConnections.get())
-                {
-                    if 
(connectionEstablishmentPolicy.mayEstablishNewConnection(_connections, 
connection))
-                    {
-                        _connections.add(connection);
-                        _totalConnectionCount.incrementAndGet();
-
-                        if (_blocked.get())
-                        {
-                            connection.block();
-                        }
-
-                        connection.pushScheduler(_networkConnectionScheduler);
-                        return Futures.immediateFuture(true);
-                    }
-                    else
-                    {
-                        return Futures.immediateFuture(false);
-                    }
-                }
-                else
-                {
-                    final VirtualHostUnavailableException exception =
-                            new VirtualHostUnavailableException(String.format(
-                                    "VirtualHost '%s' not accepting 
connections",
-                                    getName()));
-                    return Futures.immediateFailedFuture(exception);
-                }
-            }
-
-            @Override
-            public String getObject()
-            {
-                return AbstractVirtualHost.this.toString();
-            }
-
-            @Override
-            public String getAction()
-            {
-                return "register connection";
-            }
-
-            @Override
-            public String getArguments()
-            {
-                return String.valueOf(connection);
-            }
-        });
-
+            connection.block();
+        }
+        connection.pushScheduler(_networkConnectionScheduler);
     }
 
     @Override
@@ -2721,43 +2673,11 @@ public abstract class AbstractVirtualHost<X extends 
AbstractVirtualHost<X>> exte
         }
         finally
         {
-            doSync(deregisterConnectionAsync(connection));
-
+            connection.popScheduler();
+            _connections.remove(connection);
         }
     }
 
-    public ListenableFuture<Void> deregisterConnectionAsync(final 
AMQPConnection<?> connection)
-    {
-        return doOnConfigThread(new Task<ListenableFuture<Void>, 
RuntimeException>()
-        {
-            @Override
-            public ListenableFuture<Void> execute()
-            {
-                connection.popScheduler();
-                _connections.remove(connection);
-                return Futures.immediateFuture(null);
-            }
-
-            @Override
-            public String getObject()
-            {
-                return AbstractVirtualHost.this.toString();
-            }
-
-            @Override
-            public String getAction()
-            {
-                return "deregister connection";
-            }
-
-            @Override
-            public String getArguments()
-            {
-                return String.valueOf(connection);
-            }
-        });
-    }
-
     @StateTransition(currentState = {State.UNINITIALIZED, State.ERRORED}, 
desiredState = State.ACTIVE)
     private ListenableFuture<Void> onActivate()
     {
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java
deleted file mode 100644
index c1c7b44..0000000
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/NoopConnectionEstablishmentPolicy.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.transport.AMQPConnection;
-
-public class NoopConnectionEstablishmentPolicy implements 
ConnectionEstablishmentPolicy
-{
-    @Override
-    public boolean mayEstablishNewConnection(final Iterable<AMQPConnection<?>> 
existingConnections,
-                                             final AMQPConnection<?> 
newConnection)
-    {
-        return true;
-    }
-}
diff --git 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
index 4188e1a..b24447e 100644
--- 
a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
+++ 
b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConnectionLimiter.java
@@ -18,10 +18,12 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,10 +36,12 @@ import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.SystemConfig;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.VirtualHostConnectionLimitProvider;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
 import org.apache.qpid.server.security.limit.CachedConnectionLimiterImpl;
 import org.apache.qpid.server.security.limit.ConnectionLimitProvider;
 import org.apache.qpid.server.security.limit.ConnectionLimiter;
 import org.apache.qpid.server.security.limit.ConnectionLimiter.CachedLimiter;
+import org.apache.qpid.server.security.limit.ConnectionLimiterService;
 
 final class VirtualHostConnectionLimiter extends CachedConnectionLimiterImpl 
implements CachedLimiter
 {
@@ -49,6 +53,8 @@ final class VirtualHostConnectionLimiter extends 
CachedConnectionLimiterImpl imp
 
     private final Map<ConnectionLimitProvider<?>, ConnectionLimiter> 
_connectionLimitProviders = new ConcurrentHashMap<>();
 
+    private final List<ConnectionLimiterService> _serviceLimiters = new 
CopyOnWriteArrayList<>();
+
     private final ChangeListener _virtualHostChangeListener;
     private final ChangeListener _brokerChangeListener;
 
@@ -70,6 +76,13 @@ final class VirtualHostConnectionLimiter extends 
CachedConnectionLimiterImpl imp
                 .forEach(child -> 
child.addChangeListener(ProviderChangeListener.virtualHostChangeListener(this)));
         _broker.getChildren(BrokerConnectionLimitProvider.class)
                 .forEach(child -> 
child.addChangeListener(ProviderChangeListener.brokerChangeListener(this)));
+
+        final QpidServiceLoader serviceLoader = new QpidServiceLoader();
+        for (final ConnectionLimiterService service : 
serviceLoader.instancesOf(ConnectionLimiterService.class))
+        {
+            LOGGER.debug("New connection limiter service found: {}", 
service.getType());
+            _serviceLimiters.add(service);
+        }
     }
 
     public void activate()
@@ -90,6 +103,8 @@ final class VirtualHostConnectionLimiter extends 
CachedConnectionLimiterImpl imp
         _broker.getChildren(BrokerConnectionLimitProvider.class)
                 .forEach(child -> 
child.removeChangeListener(brokerChangeListener));
 
+        _serviceLimiters.clear();
+
         swapLimiter(ConnectionLimiter.noLimits());
     }
 
@@ -140,6 +155,12 @@ final class VirtualHostConnectionLimiter extends 
CachedConnectionLimiterImpl imp
                 limiter = ConnectionLimiter.blockEveryone();
             }
         }
+
+        LOGGER.debug("Updating service based connection limiters");
+        for (final ConnectionLimiterService service : _serviceLimiters)
+        {
+            limiter = limiter.append(service);
+        }
         return limiter;
     }
 
diff --git 
a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java 
b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
index a5f2e21..350fda5 100644
--- 
a/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
+++ 
b/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
@@ -78,7 +78,6 @@ import 
org.apache.qpid.server.store.preferences.PreferenceStore;
 import org.apache.qpid.server.store.preferences.PreferenceStoreUpdater;
 import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.virtualhost.NodeAutoCreationPolicy;
-import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
 import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
@@ -338,7 +337,7 @@ public class VirtualHostTest extends UnitTestBase
                             virtualHost.getConnectionCount());
 
         AMQPConnection modelConnection = getMockConnection();
-        virtualHost.registerConnection(modelConnection, new 
NoopConnectionEstablishmentPolicy());
+        virtualHost.registerConnection(modelConnection);
 
         assertEquals("Unexpected number of connections after connection 
registered",
                             (long) 1,
@@ -367,7 +366,7 @@ public class VirtualHostTest extends UnitTestBase
                             virtualHost.getConnectionCount());
 
         AMQPConnection modelConnection = getMockConnection();
-        virtualHost.registerConnection(modelConnection, new 
NoopConnectionEstablishmentPolicy());
+        virtualHost.registerConnection(modelConnection);
 
         assertEquals("Unexpected number of connections after connection 
registered",
                             (long) 1,
@@ -499,7 +498,7 @@ public class VirtualHostTest extends UnitTestBase
     {
         VirtualHost<?> host = createVirtualHost(getTestName());
         AMQPConnection connection = getMockConnection();
-        host.registerConnection(connection, new 
NoopConnectionEstablishmentPolicy());
+        host.registerConnection(connection);
         ((EventListener) host).event(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
         verify(connection).block();
     }
@@ -578,7 +577,7 @@ public class VirtualHostTest extends UnitTestBase
         AMQPConnection<?> connection = getMockConnection();
 
         assertEquals("unexpected number of connections before test", (long) 0, 
vhost.getConnectionCount());
-        vhost.registerConnection(connection, new 
NoopConnectionEstablishmentPolicy());
+        vhost.registerConnection(connection);
         assertEquals("unexpected number of connections after 
registerConnection",
                             (long) 1,
                             vhost.getConnectionCount());
@@ -591,7 +590,7 @@ public class VirtualHostTest extends UnitTestBase
         QueueManagingVirtualHost<?> vhost = createVirtualHost("sdf");
         AMQPConnection<?> connection = getMockConnection();
 
-        vhost.registerConnection(connection, new 
NoopConnectionEstablishmentPolicy());
+        vhost.registerConnection(connection);
         assertEquals("unexpected number of connections after 
registerConnection",
                             (long) 1,
                             vhost.getConnectionCount());
@@ -610,7 +609,7 @@ public class VirtualHostTest extends UnitTestBase
         ((AbstractConfiguredObject<?>)vhost).stop();
         try
         {
-            vhost.registerConnection(connection, new 
NoopConnectionEstablishmentPolicy());
+            vhost.registerConnection(connection);
             fail("exception not thrown");
         }
         catch (VirtualHostUnavailableException e)
@@ -619,7 +618,7 @@ public class VirtualHostTest extends UnitTestBase
         }
         assertEquals("unexpected number of connections", (long) 0, 
vhost.getConnectionCount());
         ((AbstractConfiguredObject<?>)vhost).start();
-        vhost.registerConnection(connection, new 
NoopConnectionEstablishmentPolicy());
+        vhost.registerConnection(connection);
         assertEquals("unexpected number of connections", (long) 1, 
vhost.getConnectionCount());
     }
 
diff --git 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 6cc8fe8..a2d3bbd 100644
--- 
a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ 
b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -73,7 +73,6 @@ import 
org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.transport.network.NetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
 
 public class ServerConnection extends ConnectionInvoker
 {
@@ -194,7 +193,7 @@ public class ServerConnection extends ConnectionInvoker
 
     public void setVirtualHost(NamedAddressSpace addressSpace)
     {
-        addressSpace.registerConnection(_amqpConnection, new 
NoopConnectionEstablishmentPolicy());
+        addressSpace.registerConnection(_amqpConnection);
         _amqpConnection.setAddressSpace(addressSpace);
     }
 
diff --git 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
index e30b09e..bc04386 100644
--- 
a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
+++ 
b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8Impl.java
@@ -79,7 +79,6 @@ import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.NoopConnectionEstablishmentPolicy;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 
 public class AMQPConnection_0_8Impl
@@ -984,7 +983,7 @@ public class AMQPConnection_0_8Impl
 
         try
         {
-            addressSpace.registerConnection(this, new 
NoopConnectionEstablishmentPolicy());
+            addressSpace.registerConnection(this);
             setAddressSpace(addressSpace);
 
             if (addressSpace.authoriseCreateConnection(this))
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
index 10a907d..ef5057e 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
@@ -29,6 +29,7 @@ import 
org.apache.qpid.server.protocol.v1_0.codec.SectionDecoderRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import 
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.transport.AMQPConnection;
@@ -88,4 +89,6 @@ public interface AMQPConnection_1_0<C extends 
AMQPConnection_1_0<C>> extends AMQ
 
     @DerivedAttribute(description = "If true send a final SASL challenge using 
a SaslChallenge performative, rather than SaslOutcome.")
     boolean getSendSaslFinalChallengeAsChallenge();
+
+    SoleConnectionEnforcementPolicy getSoleConnectionEnforcementPolicy();
 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 8f8e351..f2d7262 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -48,8 +48,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.PeekingIterator;
@@ -92,6 +90,7 @@ import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry
 import 
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionConnectionProperties;
 import 
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy;
 import 
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy;
+import 
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicyException;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
@@ -119,7 +118,6 @@ import 
org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManag
 import 
org.apache.qpid.server.security.auth.manager.ExternalAuthenticationManagerImpl;
 import org.apache.qpid.server.security.auth.sasl.SaslNegotiator;
 import org.apache.qpid.server.session.AMQPSession;
-import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ByteBufferSender;
@@ -926,86 +924,28 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
 
         try
         {
-            final boolean registerSucceeded = 
addressSpace.registerConnection(this, (existingConnections, newConnection) ->
-            {
-                boolean proceedWithRegistration = true;
-                if (newConnection instanceof AMQPConnection_1_0Impl && 
!newConnection.isClosing())
-                {
-                    final List<ListenableFuture<Void>> rescheduleFutures = new 
ArrayList<>();
-                    for (AMQPConnection<?> existingConnection : 
StreamSupport.stream(existingConnections.spliterator(), false)
-                            .filter(con -> con instanceof AMQPConnection_1_0)
-                            .filter(con -> !con.isClosing())
-                            .filter(con -> 
con.getRemoteContainerName().equals(newConnection.getRemoteContainerName()))
-                            .collect(Collectors.toList()))
-                    {
-                        SoleConnectionEnforcementPolicy 
soleConnectionEnforcementPolicy = null;
-                        if (((AMQPConnection_1_0Impl) 
existingConnection)._soleConnectionEnforcementPolicy
-                                != null)
-                        {
-                            soleConnectionEnforcementPolicy =
-                                    ((AMQPConnection_1_0Impl) 
existingConnection)._soleConnectionEnforcementPolicy;
-                        }
-                        else if (((AMQPConnection_1_0Impl) 
newConnection)._soleConnectionEnforcementPolicy != null)
-                        {
-                            soleConnectionEnforcementPolicy =
-                                    ((AMQPConnection_1_0Impl) 
newConnection)._soleConnectionEnforcementPolicy;
-                        }
-                        if 
(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.equals(soleConnectionEnforcementPolicy))
-                        {
-                            
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
-                            final Error error = new 
Error(AmqpError.INVALID_FIELD,
-                                    String.format(
-                                            "Connection closed due to 
sole-connection-enforcement-policy '%s'",
-                                            
String.valueOf(soleConnectionEnforcementPolicy)));
-                            
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), 
Symbol.valueOf("container-id")));
-                            newConnection.doOnIOThreadAsync(() -> 
((AMQPConnection_1_0Impl) newConnection).closeConnection(error));
-                            proceedWithRegistration = false;
-                            break;
-                        }
-                        else if 
(SoleConnectionEnforcementPolicy.CLOSE_EXISTING.equals(soleConnectionEnforcementPolicy))
-                        {
-                            final Error error = new 
Error(AmqpError.RESOURCE_LOCKED,
-                                    String.format(
-                                            "Connection closed due to 
sole-connection-enforcement-policy '%s'",
-                                            
String.valueOf(soleConnectionEnforcementPolicy)));
-                            
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"),
 true));
-                            
rescheduleFutures.add(existingConnection.doOnIOThreadAsync(
-                                    () -> ((AMQPConnection_1_0Impl) 
existingConnection).closeConnection(error)));
-                            proceedWithRegistration = false;
-                        }
-                    }
-                    if (!rescheduleFutures.isEmpty())
-                    {
-                        doAfter(allAsList(rescheduleFutures), () -> 
newConnection.doOnIOThreadAsync(() -> receiveOpenInternal(addressSpace)));
-                    }
-                }
-                return proceedWithRegistration;
-            });
+            addressSpace.registerConnection(this);
+            setAddressSpace(addressSpace);
 
-            if (registerSucceeded)
+            if (!addressSpace.authoriseCreateConnection(this))
             {
-                setAddressSpace(addressSpace);
-
-                if (!addressSpace.authoriseCreateConnection(this))
+                closeConnection(AmqpError.NOT_ALLOWED, "Connection refused");
+            }
+            else
+            {
+                switch (_connectionState)
                 {
-                    closeConnection(AmqpError.NOT_ALLOWED, "Connection 
refused");
-                }
-                else
-                {
-                    switch (_connectionState)
-                    {
-                        case AWAIT_OPEN:
-                            sendOpen(_channelMax, _maxFrameSize);
-                            _connectionState = ConnectionState.OPENED;
-                            break;
-                        case CLOSE_SENT:
-                        case CLOSED:
-                            // already sent our close - probably due to an 
error
-                            break;
-                        default:
-                            throw new 
ConnectionScopedRuntimeException(String.format(
-                                    "Unexpected state %s during connection 
open.", _connectionState));
-                    }
+                    case AWAIT_OPEN:
+                        sendOpen(_channelMax, _maxFrameSize);
+                        _connectionState = ConnectionState.OPENED;
+                        break;
+                    case CLOSE_SENT:
+                    case CLOSED:
+                        // already sent our close - probably due to an error
+                        break;
+                    default:
+                        throw new 
ConnectionScopedRuntimeException(String.format(
+                                "Unexpected state %s during connection open.", 
_connectionState));
                 }
             }
         }
@@ -1013,6 +953,10 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         {
             closeConnection(AmqpError.NOT_ALLOWED, e.getMessage());
         }
+        catch (SoleConnectionEnforcementPolicyException e)
+        {
+            handleSoleConnectionEnforcement(addressSpace, e);
+        }
         catch (ConnectionLimitException e)
         {
             LOGGER.debug("User connection limit exceeded", e);
@@ -1020,6 +964,39 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
         }
     }
 
+    private void handleSoleConnectionEnforcement(final NamedAddressSpace 
addressSpace,
+                                                 final 
SoleConnectionEnforcementPolicyException e)
+    {
+        if (isClosing())
+        {
+            return;
+        }
+        if (e.getPolicy() == SoleConnectionEnforcementPolicy.REFUSE_CONNECTION)
+        {
+            
_properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
+            final Error error = new Error(AmqpError.INVALID_FIELD,
+                    String.format("Connection closed due to 
sole-connection-enforcement-policy '%s'", e.getPolicy()));
+            
error.setInfo(Collections.singletonMap(Symbol.valueOf("invalid-field"), 
Symbol.valueOf("container-id")));
+            closeConnection(error);
+        }
+        else if (e.getPolicy() == 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING)
+        {
+            final Error error = new Error(AmqpError.RESOURCE_LOCKED,
+                    String.format("Connection closed due to 
sole-connection-enforcement-policy '%s'", e.getPolicy()));
+            
error.setInfo(Collections.singletonMap(Symbol.valueOf("sole-connection-enforcement"),
 true));
+
+            final List<ListenableFuture<Void>> rescheduleFutures = new 
ArrayList<>();
+            for (final AMQPConnection_1_0<?> connection : 
e.getExistingConnections())
+            {
+                if (!connection.isClosing())
+                {
+                    rescheduleFutures.add(connection.doOnIOThreadAsync(() -> 
connection.close(error)));
+                }
+            }
+            doAfter(allAsList(rescheduleFutures), () -> doOnIOThreadAsync(() 
-> receiveOpenInternal(addressSpace)));
+        }
+    }
+
     private void populateConnectionRedirect(final NamedAddressSpace 
addressSpace, final Error err)
     {
         final String redirectHost = addressSpace.getRedirectHost(getPort());
@@ -1885,6 +1862,12 @@ public class AMQPConnection_1_0Impl extends 
AbstractAMQPConnection<AMQPConnectio
     }
 
     @Override
+    public SoleConnectionEnforcementPolicy getSoleConnectionEnforcementPolicy()
+    {
+        return _soleConnectionEnforcementPolicy;
+    }
+
+    @Override
     protected boolean isOpeningInProgress()
     {
         switch (_connectionState)
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
index e3a3ef5..bd66dd0 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicy.java
@@ -15,26 +15,27 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
 package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
 
+import java.util.Locale;
+
 import org.apache.qpid.server.protocol.v1_0.type.RestrictedType;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
-public class SoleConnectionDetectionPolicy implements 
RestrictedType<UnsignedInteger>
+public enum SoleConnectionDetectionPolicy implements 
RestrictedType<UnsignedInteger>
 {
-    public static final SoleConnectionDetectionPolicy
-            STRONG = new 
SoleConnectionDetectionPolicy(UnsignedInteger.valueOf(0));
-    public static final SoleConnectionDetectionPolicy
-            WEAK = new 
SoleConnectionDetectionPolicy(UnsignedInteger.valueOf(1));
+    STRONG(0),
+    WEAK(1);
 
     private final UnsignedInteger _val;
 
-    private SoleConnectionDetectionPolicy(final UnsignedInteger val)
+    private final String _description;
+
+    SoleConnectionDetectionPolicy(int val)
     {
-        _val = val;
+        _val = UnsignedInteger.valueOf(val);
+        _description = name().toLowerCase(Locale.ENGLISH);
     }
 
     @Override
@@ -45,39 +46,21 @@ public class SoleConnectionDetectionPolicy implements 
RestrictedType<UnsignedInt
 
     public static SoleConnectionDetectionPolicy valueOf(Object obj)
     {
-        if (obj instanceof UnsignedInteger)
+        for (final SoleConnectionDetectionPolicy detectionPolicy : values())
         {
-            UnsignedInteger val = (UnsignedInteger) obj;
-
-            if (STRONG._val.equals(val))
-            {
-                return STRONG;
-            }
-
-            if (WEAK._val.equals(val))
+            if (detectionPolicy._val.equals(obj))
             {
-                return WEAK;
+                return detectionPolicy;
             }
         }
 
-        final String message = String.format("Cannot convert '%s' into 
'sole-connection-detection-policy'", obj);
-        throw new IllegalArgumentException(message);
+        throw new IllegalArgumentException(
+                String.format("Cannot convert '%s' into 
'sole-connection-detection-policy'", obj));
     }
 
     @Override
     public String toString()
     {
-
-        if (this == STRONG)
-        {
-            return "strong";
-        }
-
-        if (this == WEAK)
-        {
-            return "weak";
-        }
-
-        return String.valueOf(_val);
+        return _description;
     }
 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
index 86a57b1..2655d36 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicy.java
@@ -15,26 +15,26 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-
 package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
 
+import java.util.Objects;
+
 import org.apache.qpid.server.protocol.v1_0.type.RestrictedType;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
-public class SoleConnectionEnforcementPolicy implements 
RestrictedType<UnsignedInteger>
+public enum SoleConnectionEnforcementPolicy implements 
RestrictedType<UnsignedInteger>
 {
-    public static final SoleConnectionEnforcementPolicy
-            REFUSE_CONNECTION = new 
SoleConnectionEnforcementPolicy(UnsignedInteger.valueOf(0));
-    public static final SoleConnectionEnforcementPolicy
-            CLOSE_EXISTING = new 
SoleConnectionEnforcementPolicy(UnsignedInteger.valueOf(1));
+    REFUSE_CONNECTION(0, "refuse-connection"),
+    CLOSE_EXISTING(1, "close-existing");
 
     private final UnsignedInteger _val;
+    private final String _description;
 
-    private SoleConnectionEnforcementPolicy(final UnsignedInteger val)
+    SoleConnectionEnforcementPolicy(int val, String description)
     {
-        _val = val;
+        _val = UnsignedInteger.valueOf(val);
+        _description = Objects.requireNonNull(description);
     }
 
     @Override
@@ -45,39 +45,21 @@ public class SoleConnectionEnforcementPolicy implements 
RestrictedType<UnsignedI
 
     public static SoleConnectionEnforcementPolicy valueOf(Object obj)
     {
-        if (obj instanceof UnsignedInteger)
+        for (final SoleConnectionEnforcementPolicy policy : values())
         {
-            UnsignedInteger val = (UnsignedInteger) obj;
-
-            if (REFUSE_CONNECTION._val.equals(val))
-            {
-                return REFUSE_CONNECTION;
-            }
-
-            if (CLOSE_EXISTING._val.equals(val))
+            if (policy._val.equals(obj))
             {
-                return CLOSE_EXISTING;
+                return policy;
             }
         }
 
-        final String message = String.format("Cannot convert '%s' into 
'sole-connection-enforcement-policy'", obj);
-        throw new IllegalArgumentException(message);
+        throw new IllegalArgumentException(
+                String.format("Cannot convert '%s' into 
'sole-connection-enforcement-policy'", obj));
     }
 
     @Override
     public String toString()
     {
-
-        if (this == REFUSE_CONNECTION)
-        {
-            return "refuse-connection";
-        }
-
-        if (this == CLOSE_EXISTING)
-        {
-            return "close-existing";
-        }
-
-        return String.valueOf(_val);
+        return _description;
     }
 }
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
new file mode 100644
index 0000000..8f49b7b
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.security.limit.ConnectionLimitException;
+
+public class SoleConnectionEnforcementPolicyException extends 
ConnectionLimitException
+{
+    private final Set<AMQPConnection_1_0<?>> _existingConnections;
+
+    private final SoleConnectionEnforcementPolicy _policy;
+
+    public 
SoleConnectionEnforcementPolicyException(SoleConnectionEnforcementPolicy policy,
+                                                    Collection<? extends 
AMQPConnection_1_0<?>> connections)
+    {
+        super(String.format("Single connection is required due to 
sole-connection-enforcement-policy '%s'", policy));
+        _policy = policy;
+        _existingConnections = new HashSet<>(connections);
+    }
+
+    public SoleConnectionEnforcementPolicy getPolicy()
+    {
+        return _policy;
+    }
+
+    public Set<AMQPConnection_1_0<?>> getExistingConnections()
+    {
+        return Collections.unmodifiableSet(_existingConnections);
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
new file mode 100644
index 0000000..c0ff83b
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiter.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.security.limit.ConnectionLimiter;
+import org.apache.qpid.server.security.limit.ConnectionLimiterService;
+import org.apache.qpid.server.security.limit.ConnectionSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@PluggableService
+public class StrongConnectionEstablishmentLimiter implements 
ConnectionLimiterService
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StrongConnectionEstablishmentLimiter.class);
+
+    private final Map<String, UsageCounter> _slots;
+
+    private final ConnectionLimiter _underlyingLimiter;
+
+    public StrongConnectionEstablishmentLimiter()
+    {
+        super();
+        _slots = new ConcurrentHashMap<>();
+        _underlyingLimiter = ConnectionLimiter.noLimits();
+    }
+
+    private 
StrongConnectionEstablishmentLimiter(StrongConnectionEstablishmentLimiter 
limiter, ConnectionLimiter underlyingLimiter)
+    {
+        super();
+        _slots = limiter._slots;
+        _underlyingLimiter = Objects.requireNonNull(underlyingLimiter);
+    }
+
+    @Override
+    public String getType()
+    {
+        return "EstablishmentPolicy." + SoleConnectionDetectionPolicy.STRONG;
+    }
+
+    @Override
+    public ConnectionSlot register(AMQPConnection<?> connection)
+    {
+        if (!(connection instanceof AMQPConnection_1_0) || 
connection.isClosing())
+        {
+            return _underlyingLimiter.register(connection);
+        }
+        LOGGER.debug("Registering a new connection '{}'", connection);
+        final AMQPConnection_1_0<?> newConnection = (AMQPConnection_1_0<?>) 
connection;
+        final String remoteContainerId = newConnection.getRemoteContainerId();
+        if (remoteContainerId == null)
+        {
+            // 'container-id' is the mandatory field of open frame but could 
be null in integration or JUnit tests,
+            // e.g. when AMQPConnection_1_0 is mocked in a test.
+            LOGGER.warn(
+                    "The connection '{}' without container ID, 'container-id' 
is the mandatory field of open frame",
+                    connection);
+            return _underlyingLimiter.register(connection);
+        }
+
+        LOGGER.debug("Checking a container slot for the connection '{}'", 
connection);
+        try
+        {
+            return _slots.compute(remoteContainerId,
+                            (containerId, counter) -> counter == null ? 
newUsageCounter(containerId) : counter.addUser())
+                    .registerConnection(newConnection);
+        }
+        catch (RuntimeException e)
+        {
+            LOGGER.debug("Registering connection failed", e);
+            deregisterUser(remoteContainerId);
+            throw e;
+        }
+    }
+
+    private void deregisterUser(final String containerId)
+    {
+        _slots.computeIfPresent(containerId, (id, slot) -> slot.removeUser());
+    }
+
+    private UsageCounter newUsageCounter(String containerId)
+    {
+        return new UsageCounter(new RemoteContainerSlot(containerId), 1L);
+    }
+
+    @Override
+    public ConnectionLimiter append(ConnectionLimiter limiter)
+    {
+        return new StrongConnectionEstablishmentLimiter(this, 
_underlyingLimiter.append(limiter));
+    }
+
+    private static final class UsageCounter
+    {
+        private final long _counter;
+
+        private final RemoteContainerSlot _slot;
+
+        UsageCounter(RemoteContainerSlot slot, long counter)
+        {
+            _counter = counter;
+            _slot = Objects.requireNonNull(slot);
+        }
+
+        public ConnectionSlot registerConnection(final AMQPConnection_1_0<?> 
connection)
+        {
+            return _slot.register(connection);
+        }
+
+        public UsageCounter addUser()
+        {
+            return new UsageCounter(_slot, _counter + 1L);
+        }
+
+        public UsageCounter removeUser()
+        {
+            return _counter <= 1 ? null : new UsageCounter(_slot, _counter - 
1L);
+        }
+    }
+
+    private final class RemoteContainerSlot
+    {
+        private final String _containerId;
+
+        private final Set<AMQPConnection_1_0<?>> _connections;
+
+        RemoteContainerSlot(String containerId)
+        {
+            super();
+            _connections = new HashSet<>();
+            _containerId = Objects.requireNonNull(containerId);
+        }
+
+        private synchronized ConnectionSlot register(final 
AMQPConnection_1_0<?> connection)
+        {
+            final SoleConnectionEnforcementPolicy soleConnectionPolicy = 
extractPolicy(connection);
+
+            if (soleConnectionPolicy != null && !_connections.isEmpty())
+            {
+                LOGGER.debug("Single connection is required, sole connection 
policy: {}", soleConnectionPolicy);
+                throw new 
SoleConnectionEnforcementPolicyException(soleConnectionPolicy, _connections);
+            }
+
+            final ConnectionSlot underlyingSlot = 
_underlyingLimiter.register(connection);
+            _connections.add(connection);
+            final ConnectionSlot slot = () ->
+            {
+                try
+                {
+                    remove(connection);
+                }
+                finally
+                {
+                    deregisterUser(_containerId);
+                }
+            };
+            return slot.chainTo(underlyingSlot);
+        }
+
+        private SoleConnectionEnforcementPolicy 
extractPolicy(AMQPConnection_1_0<?> connection)
+        {
+            if (_connections.isEmpty())
+            {
+                return connection.getSoleConnectionEnforcementPolicy();
+            }
+            SoleConnectionEnforcementPolicy soleConnectionPolicy = null;
+
+            final Iterator<AMQPConnection_1_0<?>> iterator = 
_connections.iterator();
+            while (iterator.hasNext())
+            {
+                final AMQPConnection_1_0<?> existingConnection = 
iterator.next();
+                if (existingConnection.isClosing())
+                {
+                    iterator.remove();
+                }
+                else
+                {
+                    soleConnectionPolicy = 
existingConnection.getSoleConnectionEnforcementPolicy();
+                }
+            }
+            if (soleConnectionPolicy == null)
+            {
+                return connection.getSoleConnectionEnforcementPolicy();
+            }
+            return soleConnectionPolicy;
+        }
+
+        private synchronized void remove(final AMQPConnection_1_0<?> 
connection)
+        {
+            _connections.remove(connection);
+        }
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
index 3f1f10f..962fabb 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0Test.java
@@ -64,6 +64,7 @@ import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame;
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import 
org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import 
org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicyException;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.security.SubjectCreator;
@@ -75,7 +76,6 @@ import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.transport.AggregateTicker;
 import org.apache.qpid.server.transport.ByteBufferSender;
 import org.apache.qpid.server.transport.ServerNetworkConnection;
-import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
 import org.apache.qpid.server.virtualhost.VirtualHostPrincipal;
 import org.apache.qpid.test.utils.UnitTestBase;
 
@@ -121,16 +121,15 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
         when(_virtualHost.isActive()).thenReturn(true);
 
         final ArgumentCaptor<AMQPConnection> connectionCaptor = 
ArgumentCaptor.forClass(AMQPConnection.class);
-        final ArgumentCaptor<ConnectionEstablishmentPolicy> 
establishmentPolicyCaptor = 
ArgumentCaptor.forClass(ConnectionEstablishmentPolicy.class);
         doAnswer(new Answer()
         {
             @Override
             public Object answer(final InvocationOnMock invocation) throws 
Throwable
             {
                 _connection = connectionCaptor.getValue();
-                return null;
+                throw new SoleConnectionEnforcementPolicyException(null, 
Collections.emptySet());
             }
-        }).when(_virtualHost).registerConnection(connectionCaptor.capture(), 
establishmentPolicyCaptor.capture());
+        }).when(_virtualHost).registerConnection(connectionCaptor.capture());
         
when(_virtualHost.getPrincipal()).thenReturn(mock(VirtualHostPrincipal.class));
         when(_port.getAddressSpace(anyString())).thenReturn(_virtualHost);
         when(_port.getSubjectCreator(anyBoolean(), 
anyString())).thenReturn(subjectCreator);
@@ -202,7 +201,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
         open.setContainerId("testContainerId");
         _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
-        verify(_virtualHost).registerConnection(any(AMQPConnection.class), 
any(ConnectionEstablishmentPolicy.class));
+        verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal,
@@ -224,7 +223,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
         open.setContainerId("testContainerId");
         _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
-        verify(_virtualHost, 
never()).registerConnection(any(AMQPConnection.class), 
any(ConnectionEstablishmentPolicy.class));
+        verify(_virtualHost, 
never()).registerConnection(any(AMQPConnection.class));
         verify(_networkConnection).close();
     }
 
@@ -244,7 +243,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
         open.setContainerId("testContainerId");
         _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
-        verify(_virtualHost).registerConnection(any(AMQPConnection.class), 
any(ConnectionEstablishmentPolicy.class));
+        verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal authPrincipal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
         assertNotNull(authPrincipal);
         assertEquals(authPrincipal, new AuthenticatedPrincipal(principal));
@@ -277,7 +276,7 @@ public class ProtocolEngine_1_0_0Test extends UnitTestBase
         open.setContainerId("testContainerId");
         _frameWriter.send(new TransportFrame((int) (short) 0, open));
 
-        verify(_virtualHost).registerConnection(any(AMQPConnection.class), 
any(ConnectionEstablishmentPolicy.class));
+        verify(_virtualHost).registerConnection(any(AMQPConnection.class));
         AuthenticatedPrincipal principal = (AuthenticatedPrincipal) 
_connection.getAuthorizedPrincipal();
         assertNotNull(principal);
         assertEquals(principal,
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java
new file mode 100644
index 0000000..aae4e93
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionDetectionPolicyTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+public class SoleConnectionDetectionPolicyTest extends UnitTestBase
+{
+    @Test
+    public void testValue()
+    {
+        assertEquals(new UnsignedInteger(0), 
SoleConnectionDetectionPolicy.STRONG.getValue());
+        assertEquals(new UnsignedInteger(1), 
SoleConnectionDetectionPolicy.WEAK.getValue());
+    }
+
+    @Test
+    public void testValueOf()
+    {
+        assertEquals(SoleConnectionDetectionPolicy.STRONG, 
SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(0)));
+        assertEquals(SoleConnectionDetectionPolicy.WEAK, 
SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(1)));
+
+        try
+        {
+            SoleConnectionDetectionPolicy.valueOf(new UnsignedInteger(2));
+            fail("An exception is expected");
+        }
+        catch (RuntimeException e)
+        {
+            assertNotNull(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testToString()
+    {
+        assertEquals("strong", 
SoleConnectionDetectionPolicy.STRONG.toString());
+        assertEquals("weak", SoleConnectionDetectionPolicy.WEAK.toString());
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java
new file mode 100644
index 0000000..1e10dcd
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/SoleConnectionEnforcementPolicyTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class SoleConnectionEnforcementPolicyTest extends UnitTestBase
+{
+    @Test
+    public void testValue()
+    {
+        assertEquals(new UnsignedInteger(0), 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.getValue());
+        assertEquals(new UnsignedInteger(1), 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING.getValue());
+    }
+
+    @Test
+    public void testValueOf()
+    {
+        assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION, 
SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(0)));
+        assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING, 
SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(1)));
+
+        try
+        {
+            SoleConnectionEnforcementPolicy.valueOf(new UnsignedInteger(2));
+            fail("An exception is expected");
+        }
+        catch (RuntimeException e)
+        {
+            assertNotNull(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testToString()
+    {
+        assertEquals("refuse-connection", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION.toString());
+        assertEquals("close-existing", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING.toString());
+    }
+}
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
new file mode 100644
index 0000000..fdbd0b4
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/type/extensions/soleconn/StrongConnectionEstablishmentLimiterTest.java
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn;
+
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.qpid.server.protocol.v1_0.AMQPConnection_1_0;
+import org.apache.qpid.server.security.limit.ConnectionLimiter;
+import org.apache.qpid.server.security.limit.ConnectionSlot;
+import org.apache.qpid.server.transport.AMQPConnection;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StrongConnectionEstablishmentLimiterTest extends UnitTestBase
+{
+    private StrongConnectionEstablishmentLimiter _limiter;
+
+    private Registry _registry;
+
+    @Before
+    public void setUp()
+    {
+        _registry = new Registry();
+        _limiter = (StrongConnectionEstablishmentLimiter) new 
StrongConnectionEstablishmentLimiter().append(_registry);
+    }
+
+    @Test
+    public void testType()
+    {
+        assertEquals("EstablishmentPolicy.strong", _limiter.getType());
+    }
+
+    @Test
+    public void testNoPolicy()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C", null);
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        final AMQPConnection_1_0<?> connection2 = newConnection("C", null);
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        final AMQPConnection_1_0<?> connection3 = newConnection("C", null);
+        final ConnectionSlot slot3 = _limiter.register(connection3);
+        assertTrue(_registry.isRegistered(connection3));
+
+        slot3.free();
+        assertFalse(_registry.isRegistered(connection3));
+        assertTrue(_registry.hasBeenRegistered(connection3));
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+    }
+
+    @Test
+    public void testNewConnectionWithPolicy()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C", null);
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        final AMQPConnection_1_0<?> connection2 = newConnection("C", null);
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        final AMQPConnection_1_0<?> connection3 = newConnection("C", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+        try
+        {
+            _limiter.register(connection3);
+            fail("A sole connection enforcement policy exception is expected");
+        }
+        catch (SoleConnectionEnforcementPolicyException e)
+        {
+            assertEquals(
+                    "Single connection is required due to 
sole-connection-enforcement-policy 'refuse-connection'",
+                    e.getMessage());
+
+            assertEquals(2, e.getExistingConnections().size());
+            assertTrue(e.getExistingConnections().contains(connection1));
+            assertTrue(e.getExistingConnections().contains(connection2));
+            assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION, 
e.getPolicy());
+        }
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+    }
+
+    @Test
+    public void testExistingConnectionWithPolicy()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        final AMQPConnection_1_0<?> connection2 = newConnection("C", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+        try
+        {
+            _limiter.register(connection2);
+            fail("A sole connection enforcement policy exception is expected");
+        }
+        catch (SoleConnectionEnforcementPolicyException e)
+        {
+            assertEquals(
+                    "Single connection is required due to 
sole-connection-enforcement-policy 'close-existing'",
+                    e.getMessage());
+
+            assertEquals(1, e.getExistingConnections().size());
+            assertTrue(e.getExistingConnections().contains(connection1));
+            assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING, 
e.getPolicy());
+        }
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+    }
+
+    @Test
+    public void testExistingClosedConnectionWithPolicy()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+        Mockito.doReturn(false).when(connection1).isClosing();
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        Mockito.doReturn(true).when(connection1).isClosing();
+        final AMQPConnection_1_0<?> connection2 = newConnection("C", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+    }
+
+    @Test
+    public void testClosedConnection()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        final AMQPConnection_1_0<?> connection2 = newConnection("C", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+        Mockito.doReturn(true).when(connection1).isClosing();
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+    }
+
+    @Test
+    public void testNewConnectionWithPolicy_ClosedExisting()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+        Mockito.doReturn(false).when(connection1).isClosing();
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        Mockito.doReturn(true).when(connection1).isClosing();
+        final AMQPConnection_1_0<?> connection2 = newConnection("C", null);
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        final AMQPConnection_1_0<?> connection3 = newConnection("C", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+        try
+        {
+            _limiter.register(connection3);
+            fail("A sole connection enforcement policy exception is expected");
+        }
+        catch (SoleConnectionEnforcementPolicyException e)
+        {
+            assertEquals(
+                    "Single connection is required due to 
sole-connection-enforcement-policy 'close-existing'",
+                    e.getMessage());
+
+            assertEquals(1, e.getExistingConnections().size());
+            assertTrue(e.getExistingConnections().contains(connection2));
+            assertEquals(SoleConnectionEnforcementPolicy.CLOSE_EXISTING, 
e.getPolicy());
+        }
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+    }
+
+    @Test
+    public void testNewConnectionWithPolicy2_ClosedExisting()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+        Mockito.doReturn(false).when(connection1).isClosing();
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        Mockito.doReturn(true).when(connection1).isClosing();
+        final AMQPConnection_1_0<?> connection2 = newConnection("C", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        final AMQPConnection_1_0<?> connection3 = newConnection("C", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+        try
+        {
+            _limiter.register(connection3);
+            fail("A sole connection enforcement policy exception is expected");
+        }
+        catch (SoleConnectionEnforcementPolicyException e)
+        {
+            assertEquals(
+                    "Single connection is required due to 
sole-connection-enforcement-policy 'refuse-connection'",
+                    e.getMessage());
+
+            assertEquals(1, e.getExistingConnections().size());
+            assertTrue(e.getExistingConnections().contains(connection2));
+            assertEquals(SoleConnectionEnforcementPolicy.REFUSE_CONNECTION, 
e.getPolicy());
+        }
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+    }
+
+    @Test
+    public void testAnotherConnectionType()
+    {
+        final AMQPConnection<?> connection = 
Mockito.mock(AMQPConnection.class);
+        final ConnectionSlot slot = _limiter.register(connection);
+        assertTrue(_registry.isRegistered(connection));
+        slot.free();
+        assertFalse(_registry.isRegistered(connection));
+        assertTrue(_registry.hasBeenRegistered(connection));
+        Mockito.verifyNoInteractions(connection);
+    }
+
+    @Test
+    public void testMultipleIndependentConnections()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection("C1", null);
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        final AMQPConnection_1_0<?> connection2 = newConnection("C2", 
SoleConnectionEnforcementPolicy.REFUSE_CONNECTION);
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        final AMQPConnection_1_0<?> connection3 = newConnection("C3", 
SoleConnectionEnforcementPolicy.CLOSE_EXISTING);
+        final ConnectionSlot slot3 = _limiter.register(connection3);
+        assertTrue(_registry.isRegistered(connection3));
+
+        slot3.free();
+        assertFalse(_registry.isRegistered(connection3));
+        assertTrue(_registry.hasBeenRegistered(connection3));
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+    }
+
+    @Test
+    public void testMultipleIndependentConnections2()
+    {
+        final AMQPConnection_1_0<?> connection1 = newConnection(null, null);
+        final ConnectionSlot slot1 = _limiter.register(connection1);
+        assertTrue(_registry.isRegistered(connection1));
+
+        final AMQPConnection_1_0<?> connection2 = newConnection(null, null);
+        final ConnectionSlot slot2 = _limiter.register(connection2);
+        assertTrue(_registry.isRegistered(connection2));
+
+        final AMQPConnection_1_0<?> connection3 = newConnection(null, null);
+        final ConnectionSlot slot3 = _limiter.register(connection3);
+        assertTrue(_registry.isRegistered(connection3));
+
+        slot3.free();
+        assertFalse(_registry.isRegistered(connection3));
+        assertTrue(_registry.hasBeenRegistered(connection3));
+
+        slot2.free();
+        assertFalse(_registry.isRegistered(connection2));
+        assertTrue(_registry.hasBeenRegistered(connection2));
+
+        slot1.free();
+        assertFalse(_registry.isRegistered(connection1));
+        assertTrue(_registry.hasBeenRegistered(connection1));
+    }
+
+    private AMQPConnection_1_0<?> newConnection(String id, 
SoleConnectionEnforcementPolicy policy)
+    {
+        final AMQPConnection_1_0<?> connection = 
Mockito.mock(AMQPConnection_1_0.class);
+        Mockito.doReturn(id).when(connection).getRemoteContainerId();
+        
Mockito.doReturn(policy).when(connection).getSoleConnectionEnforcementPolicy();
+        return connection;
+    }
+
+    static final class Registry implements ConnectionLimiter
+    {
+        private final Set<AMQPConnection<?>> _registered;
+
+        private final Set<AMQPConnection<?>> _connections;
+
+        private final ConnectionLimiter _subLimiter;
+
+        public Registry()
+        {
+            _registered = new HashSet<>();
+            _connections = new HashSet<>();
+            _subLimiter = ConnectionLimiter.noLimits();
+        }
+
+        private Registry(Registry limiter, ConnectionLimiter subLimiter)
+        {
+            _registered = limiter._registered;
+            _connections = limiter._connections;
+            _subLimiter = Objects.requireNonNull(subLimiter);
+        }
+
+        @Override
+        public ConnectionSlot register(final AMQPConnection<?> connection)
+        {
+            final ConnectionSlot slot = _subLimiter.register(connection);
+            _registered.add(connection);
+            _connections.add(connection);
+            return slot.chainTo(() -> _connections.remove(connection));
+        }
+
+        @Override
+        public ConnectionLimiter append(ConnectionLimiter limiter)
+        {
+            return new Registry(this, _subLimiter.append(limiter));
+        }
+
+        public boolean isRegistered(AMQPConnection<?> connection)
+        {
+            return _connections.contains(connection);
+        }
+
+        public boolean hasBeenRegistered(AMQPConnection<?> connection)
+        {
+            return _registered.contains(connection);
+        }
+    }
+}
diff --git 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
index 0e83704..d3fd813 100644
--- 
a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
+++ 
b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
@@ -68,7 +68,6 @@ import org.apache.qpid.server.transport.AMQPConnection;
 import org.apache.qpid.server.txn.DtxNotSupportedException;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.util.Action;
-import org.apache.qpid.server.virtualhost.ConnectionEstablishmentPolicy;
 import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
 import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 import org.apache.qpid.server.virtualhost.VirtualHostPropertiesNode;
@@ -195,11 +194,9 @@ public class ManagementAddressSpace implements 
NamedAddressSpace
     }
 
     @Override
-    public boolean registerConnection(final AMQPConnection<?> connection,
-                                      final ConnectionEstablishmentPolicy 
connectionEstablishmentPolicy)
+    public void registerConnection(final AMQPConnection<?> connection)
     {
         _connections.add(connection);
-        return true;
     }
 
     @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to