https://issues.apache.org/jira/browse/AMQ-5889

Adding support for auto detection of wire protocols over a transport.
OpenWire, AMQP, STOMP, and MQTT can all be detected and the broker
will properly handle each one over a given Transport.  Currently
auto TCP, NIO, SSL, and NIO+SSL transports can handle auto-detection
of the wire format and client but support could be added in the
future for other transports like websockets.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/04ee70a1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/04ee70a1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/04ee70a1

Branch: refs/heads/master
Commit: 04ee70a161b463f69692debea623d754a5c781c2
Parents: 9f50ce3
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Thu Jul 16 18:23:24 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Tue Aug 11 19:39:29 2015 +0000

----------------------------------------------------------------------
 activemq-amqp/pom.xml                           |  21 ++
 .../transport/amqp/AmqpNioSslTransport.java     |  43 ++-
 .../amqp/AmqpNioSslTransportFactory.java        |  10 +
 .../transport/amqp/AmqpNioTransport.java        |  33 +-
 .../transport/amqp/AmqpNioTransportFactory.java |   7 +
 .../transport/amqp/AmqpTestSupport.java         |  53 ++++
 .../amqp/auto/JMSClientAutoNioPlusSslTest.java  |  51 ++++
 .../amqp/auto/JMSClientAutoNioTest.java         |  51 ++++
 .../amqp/auto/JMSClientAutoPlusSslTest.java     |  51 ++++
 .../transport/amqp/auto/JMSClientAutoTest.java  |  50 ++++
 activemq-broker/pom.xml                         |  21 ++
 .../activemq/broker/TransportConnection.java    |   1 +
 .../transport/auto/AutoSslTransportFactory.java | 117 ++++++++
 .../transport/auto/AutoSslTransportServer.java  | 149 ++++++++++
 .../transport/auto/AutoTcpTransportFactory.java | 110 +++++++
 .../transport/auto/AutoTcpTransportServer.java  | 298 +++++++++++++++++++
 .../transport/auto/AutoTransportUtils.java      |  62 ++++
 .../auto/nio/AutoNIOSSLTransportServer.java     | 122 ++++++++
 .../transport/auto/nio/AutoNIOTransport.java    |  85 ++++++
 .../auto/nio/AutoNioSslTransportFactory.java    | 131 ++++++++
 .../auto/nio/AutoNioTransportFactory.java       | 114 +++++++
 .../protocol/AmqpProtocolVerifier.java          |  36 +++
 .../protocol/MqttProtocolVerifier.java          |  45 +++
 .../protocol/OpenWireProtocolVerifier.java      |  67 +++++
 .../transport/protocol/ProtocolVerifier.java    |   9 +
 .../protocol/StompProtocolVerifier.java         |  39 +++
 .../transport/nio/AutoInitNioSSLTransport.java  | 234 +++++++++++++++
 .../services/org/apache/activemq/transport/auto |  17 ++
 .../org/apache/activemq/transport/auto+nio      |  17 ++
 .../org/apache/activemq/transport/auto+nio+ssl  |  17 ++
 .../org/apache/activemq/transport/auto+ssl      |  17 ++
 activemq-camel/pom.xml                          |  22 +-
 activemq-client/pom.xml                         |  21 ++
 .../activemq/ActiveMQConnectionFactory.java     |  18 +-
 .../activemq/transport/TransportFactory.java    |   7 +
 .../activemq/transport/nio/NIOSSLTransport.java | 109 +++++--
 .../transport/nio/NIOSSLTransportFactory.java   |  16 +
 .../transport/nio/NIOSSLTransportServer.java    |   2 +-
 .../activemq/transport/nio/NIOTransport.java    |  16 +-
 .../transport/nio/NIOTransportFactory.java      |  20 ++
 .../activemq/transport/tcp/SslTransport.java    |  22 +-
 .../transport/tcp/TcpBufferedInputStream.java   |  24 +-
 .../activemq/transport/tcp/TcpTransport.java    |  65 +++-
 .../transport/tcp/TcpTransportFactory.java      |  13 +
 .../transport/tcp/TcpTransportServer.java       |  44 ++-
 activemq-http/pom.xml                           |  22 +-
 activemq-jaas/pom.xml                           |  21 ++
 activemq-jms-pool/pom.xml                       |  21 ++
 activemq-kahadb-store/pom.xml                   |  23 +-
 activemq-karaf-itest/pom.xml                    |  21 ++
 activemq-leveldb-store/pom.xml                  |  21 ++
 activemq-mqtt/pom.xml                           |  21 ++
 .../transport/mqtt/MQTTNIOSSLTransport.java     |  23 +-
 .../mqtt/MQTTNIOSSLTransportFactory.java        |  12 +
 .../transport/mqtt/MQTTNIOTransport.java        |  34 ++-
 .../transport/mqtt/MQTTNIOTransportFactory.java |   7 +
 .../transport/mqtt/auto/MQTTAutoNioSslTest.java |  35 +++
 .../transport/mqtt/auto/MQTTAutoNioTest.java    |  35 +++
 .../transport/mqtt/auto/MQTTAutoSslTest.java    |  35 +++
 .../transport/mqtt/auto/MQTTAutoTest.java       |  35 +++
 activemq-partition/pom.xml                      |  21 ++
 activemq-pool/pom.xml                           |  24 ++
 activemq-ra/pom.xml                             |  21 ++
 activemq-runtime-config/pom.xml                 |  21 ++
 activemq-spring/pom.xml                         |  21 ++
 activemq-stomp/pom.xml                          |  21 ++
 .../transport/stomp/StompNIOSSLTransport.java   |  24 +-
 .../stomp/StompNIOSSLTransportFactory.java      |  10 +
 .../transport/stomp/StompNIOTransport.java      |  38 ++-
 .../stomp/StompNIOTransportFactory.java         |  13 +
 .../transport/stomp/StompTestSupport.java       |  56 +++-
 .../stomp/auto/StompAutoNioSslTest.java         |  44 +++
 .../transport/stomp/auto/StompAutoNioTest.java  |  40 +++
 .../transport/stomp/auto/StompAutoSslTest.java  |  44 +++
 .../transport/stomp/auto/StompAutoTest.java     |  40 +++
 activemq-unit-tests/pom.xml                     |  21 ++
 .../auto/AutoNIOSslTransportBrokerTest.java     |  67 +++++
 .../auto/AutoNIOTransportBrokerTest.java        |  38 +++
 .../auto/AutoSslTransportBrokerTest.java        |  64 ++++
 .../transport/auto/AutoTransportBrokerTest.java |  38 +++
 .../auto/AutoTransportConfigureTest.java        | 156 ++++++++++
 .../auto/failover/AutoFailoverClusterTest.java  |  27 ++
 .../auto/failover/AutoFailoverTimeoutTest.java  |  27 ++
 .../auto/failover/FailoverAutoRandomTest.java   |  28 ++
 .../FailoverAutoTransportBrokerTest.java        |  43 +++
 .../AutoNIOJmsDurableTopicSendReceiveTest.java  |  29 ++
 .../auto/nio/AutoNIOJmsSendAndReceiveTest.java  |  32 ++
 .../AutoNIOPersistentSendAndReceiveTest.java    |  27 ++
 .../transport/auto/nio/AutoNIOSSLBasicTest.java |  27 ++
 .../transport/failover/FailoverClusterTest.java |  12 +-
 .../transport/failover/FailoverRandomTest.java  |  22 +-
 .../transport/failover/FailoverTimeoutTest.java |   4 +
 .../activemq/transport/nio/NIOSSLBasicTest.java |  10 +-
 .../nio/NIOSSLTransportBrokerTest.java          |   4 +-
 activemq-web-demo/pom.xml                       |  21 ++
 activemq-web/pom.xml                            |  21 ++
 assembly/pom.xml                                |  21 ++
 97 files changed, 3912 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
index 4d3c8ef..5f10825 100644
--- a/activemq-amqp/pom.xml
+++ b/activemq-amqp/pom.xml
@@ -205,6 +205,27 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <includes>
+                <include>**/auto/*Test.java</include>
+              </includes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
 
       <profile>
           <id>activemq.tests.windows.excludes</id>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
index 3276be9..88d9284 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 import javax.net.SocketFactory;
+import javax.net.ssl.SSLEngine;
 
 import org.apache.activemq.transport.nio.NIOSSLTransport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -38,7 +39,14 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
     }
 
     public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws 
IOException {
-        super(wireFormat, socket);
+        super(wireFormat, socket, null, null, null);
+
+        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
+    }
+
+    public AmqpNioSslTransport(WireFormat wireFormat, Socket socket,
+            SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) 
throws IOException {
+        super(wireFormat, socket, engine, initBuffer, inputBuffer);
 
         frameReader.setWireFormat((AmqpWireFormat) wireFormat);
     }
@@ -55,4 +63,37 @@ public class AmqpNioSslTransport extends NIOSSLTransport {
     protected void processCommand(ByteBuffer plain) throws Exception {
         frameReader.parse(plain);
     }
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.transport.nio.NIOSSLTransport#secureRead(java.nio.ByteBuffer)
+     */
+
+    @Override
+    protected void doInit() {
+        if (initBuffer != null) {
+            nextFrameSize = -1;
+            serviceRead();
+
+        }
+    }
+
+    @Override
+    protected int secureRead(ByteBuffer plain) throws Exception {
+        if (initBuffer != null) {
+            initBuffer.buffer.flip();
+            if (initBuffer.buffer.hasRemaining()) {
+                plain.flip();
+                for (int i =0; i < 8; i++) {
+                    plain.put(initBuffer.buffer.get());
+                }
+                plain.flip();
+                processCommand(plain);
+                initBuffer.buffer.clear();
+                return 8;
+            }
+        }
+        return super.secureRead(plain);
+    }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
index 5e2fa06..7858a56 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java
@@ -21,15 +21,18 @@ import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -62,6 +65,13 @@ public class AmqpNioSslTransportFactory extends 
AmqpNioTransportFactory {
     }
 
     @Override
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+            SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
+            throws IOException {
+        return new AmqpNioSslTransport(wireFormat, socket, engine, initBuffer, 
inputBuffer);
+    }
+
+    @Override
     public TransportServer doBind(URI location) throws IOException {
         if (SslContext.getCurrentSslContext() != null) {
             try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
index 21d40eb..dfa1569 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.amqp;
 
+import java.io.ByteArrayInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -63,6 +64,12 @@ public class AmqpNioTransport extends TcpTransport {
         frameReader.setWireFormat((AmqpWireFormat) wireFormat);
     }
 
+    public AmqpNioTransport(WireFormat wireFormat, Socket socket, InitBuffer 
initBuffer) throws IOException {
+        super(wireFormat, socket, initBuffer);
+
+        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
+    }
+
     @Override
     protected void initializeStreams() throws IOException {
         channel = socket.getChannel();
@@ -91,6 +98,17 @@ public class AmqpNioTransport extends TcpTransport {
         NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
         this.dataOut = new DataOutputStream(outPutStream);
         this.buffOut = outPutStream;
+
+        try {
+            if (initBuffer != null) {
+                processBuffer(initBuffer.buffer, initBuffer.readSize);
+            }
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+            onException(IOExceptionSupport.create(e));
+        }
+
     }
 
     boolean magicRead = false;
@@ -101,6 +119,7 @@ public class AmqpNioTransport extends TcpTransport {
             while (isStarted()) {
                 // read channel
                 int readSize = channel.read(inputBuffer);
+
                 // channel is closed, cleanup
                 if (readSize == -1) {
                     onException(new EOFException());
@@ -112,11 +131,7 @@ public class AmqpNioTransport extends TcpTransport {
                     break;
                 }
 
-                receiveCounter += readSize;
-
-                inputBuffer.flip();
-                frameReader.parse(inputBuffer);
-                inputBuffer.clear();
+                processBuffer(inputBuffer, readSize);
             }
         } catch (IOException e) {
             onException(e);
@@ -125,6 +140,14 @@ public class AmqpNioTransport extends TcpTransport {
         }
     }
 
+    protected void processBuffer(ByteBuffer buffer, int readSize) throws 
Exception {
+        receiveCounter += readSize;
+
+        buffer.flip();
+        frameReader.parse(buffer);
+        buffer.clear();
+    }
+
     @Override
     protected void doStart() throws Exception {
         connect();

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
index d54d58f..4b6d9bd 100644
--- 
a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
+++ 
b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java
@@ -33,6 +33,7 @@ import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.nio.NIOTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -64,6 +65,12 @@ public class AmqpNioTransportFactory extends 
NIOTransportFactory implements Brok
         return new AmqpNioTransport(wf, socketFactory, location, 
localLocation);
     }
 
+    @Override
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+            InitBuffer initBuffer) throws IOException {
+        return new AmqpNioTransport(wireFormat, socket, initBuffer);
+    }
+
     @SuppressWarnings("rawtypes")
     @Override
     public Transport serverConfigure(Transport transport, WireFormat format, 
HashMap options) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index 91909d4..45af469 100644
--- 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -79,6 +79,15 @@ public class AmqpTestSupport {
     protected URI amqpNioPlusSslURI;
     protected int amqpNioPlusSslPort;
 
+    protected URI autoURI;
+    protected int autoPort;
+    protected URI autoSslURI;
+    protected int autoSslPort;
+    protected URI autoNioURI;
+    protected int autoNioPort;
+    protected URI autoNioPlusSslURI;
+    protected int autoNioPlusSslPort;
+
     protected URI openwireURI;
     protected int openwirePort;
 
@@ -176,6 +185,34 @@ public class AmqpTestSupport {
             amqpNioPlusSslURI = connector.getPublishableConnectURI();
             LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort);
         }
+        if (isUseAutoConnector()) {
+            connector = brokerService.addConnector(
+                "auto://0.0.0.0:" + autoPort + getAdditionalConfig());
+            autoPort = connector.getConnectUri().getPort();
+            autoURI = connector.getPublishableConnectURI();
+            LOG.debug("Using auto port " + autoPort);
+        }
+        if (isUseAutoSslConnector()) {
+            connector = brokerService.addConnector(
+                "auto+ssl://0.0.0.0:" + autoSslPort + getAdditionalConfig());
+            autoSslPort = connector.getConnectUri().getPort();
+            autoSslURI = connector.getPublishableConnectURI();
+            LOG.debug("Using auto+ssl port " + autoSslPort);
+        }
+        if (isUseAutoNioConnector()) {
+            connector = brokerService.addConnector(
+                "auto+nio://0.0.0.0:" + autoNioPort + getAdditionalConfig());
+            autoNioPort = connector.getConnectUri().getPort();
+            autoNioURI = connector.getPublishableConnectURI();
+            LOG.debug("Using auto+nio port " + autoNioPort);
+        }
+        if (isUseAutoNioPlusSslConnector()) {
+            connector = brokerService.addConnector(
+                "auto+nio+ssl://0.0.0.0:" + autoNioPlusSslPort + 
getAdditionalConfig());
+            autoNioPlusSslPort = connector.getConnectUri().getPort();
+            autoNioPlusSslURI = connector.getPublishableConnectURI();
+            LOG.debug("Using auto+nio+ssl port " + autoNioPlusSslPort);
+        }
     }
 
     protected boolean isPersistent() {
@@ -206,6 +243,22 @@ public class AmqpTestSupport {
         return false;
     }
 
+    protected boolean isUseAutoConnector() {
+        return false;
+    }
+
+    protected boolean isUseAutoSslConnector() {
+        return false;
+    }
+
+    protected boolean isUseAutoNioConnector() {
+        return false;
+    }
+
+    protected boolean isUseAutoNioPlusSslConnector() {
+        return false;
+    }
+
     protected String getAmqpTransformer() {
         return "jms";
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java
new file mode 100644
index 0000000..0bf2e38
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.auto;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest;
+import org.apache.activemq.transport.amqp.JMSClientSslTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the JMS client when connected to the NIO+SSL transport.
+ */
+public class JMSClientAutoNioPlusSslTest extends JMSClientSslTest {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientAutoNioPlusSslTest.class);
+
+    @Override
+    protected URI getBrokerURI() {
+        return autoNioPlusSslURI;
+    }
+
+    @Override
+    protected boolean isUseTcpConnector() {
+        return false;
+    }
+
+    @Override
+    protected boolean isUseAutoNioPlusSslConnector() {
+        return true;
+    }
+
+    @Override
+    protected String getTargetConnectorName() {
+        return "auto+nio+ssl";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java
new file mode 100644
index 0000000..5346a79
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.auto;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.amqp.JMSClientNioTest;
+import org.apache.activemq.transport.amqp.JMSClientTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the JMS client when connected to the NIO transport.
+ */
+public class JMSClientAutoNioTest extends JMSClientTest {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientAutoNioTest.class);
+
+    @Override
+    protected URI getBrokerURI() {
+        return autoNioURI;
+    }
+
+    @Override
+    protected boolean isUseTcpConnector() {
+        return false;
+    }
+
+    @Override
+    protected boolean isUseAutoNioConnector() {
+        return true;
+    }
+
+    @Override
+    protected String getTargetConnectorName() {
+        return "auto+nio";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java
new file mode 100644
index 0000000..deb90fb
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.auto;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest;
+import org.apache.activemq.transport.amqp.JMSClientSslTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the JMS client when connected to the NIO+SSL transport.
+ */
+public class JMSClientAutoPlusSslTest extends JMSClientSslTest {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientAutoPlusSslTest.class);
+
+    @Override
+    protected URI getBrokerURI() {
+        return autoSslURI;
+    }
+
+    @Override
+    protected boolean isUseTcpConnector() {
+        return false;
+    }
+
+    @Override
+    protected boolean isUseAutoSslConnector() {
+        return true;
+    }
+
+    @Override
+    protected String getTargetConnectorName() {
+        return "auto+ssl";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java
 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java
new file mode 100644
index 0000000..77683e4
--- /dev/null
+++ 
b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.auto;
+
+import java.net.URI;
+
+import org.apache.activemq.transport.amqp.JMSClientTest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test the JMS client when connected to the NIO transport.
+ */
+public class JMSClientAutoTest extends JMSClientTest {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(JMSClientAutoTest.class);
+
+    @Override
+    protected boolean isUseTcpConnector() {
+        return false;
+    }
+
+    @Override
+    protected boolean isUseAutoConnector() {
+        return true;
+    }
+
+    @Override
+    protected URI getBrokerURI() {
+        return autoURI;
+    }
+
+    @Override
+    protected String getTargetConnectorName() {
+        return "auto";
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-broker/pom.xml b/activemq-broker/pom.xml
index bc35400..5f8545b 100755
--- a/activemq-broker/pom.xml
+++ b/activemq-broker/pom.xml
@@ -238,5 +238,26 @@
         </plugins>
       </build>
     </profile>
+       <profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 2727503..a9d36b5 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -239,6 +239,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         }
         if (!stopping.get() && !pendingStop) {
             transportException.set(e);
+            e.printStackTrace();
             if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug(this + " failed: " + e, e);
             } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java
new file mode 100644
index 0000000..7c65311
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.auto;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.transport.tcp.SslTransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class AutoSslTransportFactory extends SslTransportFactory implements 
BrokerServiceAware  {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AutoSslTransportFactory.class);
+
+
+    protected BrokerService brokerService;
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+     */
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    private Set<String> enabledProtocols;
+
+    /**
+     * Overriding to use SslTransportServer and allow for proper reflection.
+     */
+    @Override
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, 
String>(URISupport.parseParameters(location));
+
+            Map<String, Object> autoProperties = 
IntrospectionSupport.extractProperties(options, "auto.");
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoProperties.get("protocols"));
+
+            ServerSocketFactory serverSocketFactory = 
createServerSocketFactory();
+            AutoSslTransportServer server = 
createAutoSslTransportServer(location, 
(SSLServerSocketFactory)serverSocketFactory);
+            if (options.get("allowLinkStealing") != null){
+                allowLinkStealingSet = true;
+            }
+            IntrospectionSupport.setProperties(server, options);
+            
server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, 
"auto."));
+            
server.setTransportOption(IntrospectionSupport.extractProperties(options, 
"transport."));
+            
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    boolean allowLinkStealingSet = false;
+
+    /**
+     * Allows subclasses of SslTransportFactory to create custom instances of
+     * SslTransportServer.
+     *
+     * @param location
+     * @param serverSocketFactory
+     * @return
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+   // @Override
+    protected AutoSslTransportServer createAutoSslTransportServer(final URI 
location, SSLServerSocketFactory serverSocketFactory) throws IOException, 
URISyntaxException {
+        AutoSslTransportServer server = new AutoSslTransportServer(this, 
location, serverSocketFactory,
+                this.brokerService, enabledProtocols) {
+            @Override
+            protected TcpTransport createTransport(Socket socket, WireFormat 
format)
+                    throws IOException {
+                if (format.getClass().toString().contains("MQTT") && 
!allowLinkStealingSet) {
+                    this.setAllowLinkStealing(true);
+                }
+                return super.createTransport(socket, format);
+            }
+        };
+        return server;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java
new file mode 100644
index 0000000..9954523
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.transport.auto;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Set;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.tcp.SslTransport;
+import org.apache.activemq.transport.tcp.SslTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *  An SSL TransportServer.
+ *
+ *  Allows for client certificate authentication (refer to setNeedClientAuth 
for
+ *      details).
+ *  NOTE: Client certificate authentication is disabled by default.
+ *
+ */
+public class AutoSslTransportServer extends AutoTcpTransportServer {
+
+
+
+    // Specifies if sockets created from this server should needClientAuth.
+    private boolean needClientAuth;
+
+    // Specifies if sockets created from this server should wantClientAuth.
+    private boolean wantClientAuth;
+
+//    /**
+//     * Creates a ssl transport server for the specified url using the 
provided
+//     * serverSocketFactory
+//     *
+//     * @param transportFactory The factory used to create transports when 
connections arrive.
+//     * @param location The location of the broker to bind to.
+//     * @param serverSocketFactory The factory used to create this server.
+//     * @throws IOException passed up from TcpTransportFactory.
+//     * @throws URISyntaxException passed up from TcpTransportFactory.
+//     */
+//    public SslTransportServer(SslTransportFactory transportFactory, URI 
location, SSLServerSocketFactory serverSocketFactory) throws IOException, 
URISyntaxException {
+//        super(transportFactory, location, serverSocketFactory);
+//    }
+
+    public AutoSslTransportServer(SslTransportFactory transportFactory,
+            URI location, SSLServerSocketFactory serverSocketFactory,
+            BrokerService brokerService, Set<String> enabledProtocols) throws 
IOException, URISyntaxException {
+        super(transportFactory, location, serverSocketFactory, brokerService, 
enabledProtocols);
+        // TODO Auto-generated constructor stub
+    }
+
+    /**
+     * Sets whether client authentication should be required
+     * Must be called before {@link #bind()}
+     * Note: Calling this method clears the wantClientAuth flag
+     * in the underlying implementation.
+     */
+    public void setNeedClientAuth(boolean needAuth) {
+        this.needClientAuth = needAuth;
+    }
+
+    /**
+     * Returns whether client authentication should be required.
+     */
+    public boolean getNeedClientAuth() {
+        return this.needClientAuth;
+    }
+
+    /**
+     * Returns whether client authentication should be requested.
+     */
+    public boolean getWantClientAuth() {
+        return this.wantClientAuth;
+    }
+
+    /**
+     * Sets whether client authentication should be requested.
+     * Must be called before {@link #bind()}
+     * Note: Calling this method clears the needClientAuth flag
+     * in the underlying implementation.
+     */
+    public void setWantClientAuth(boolean wantAuth) {
+        this.wantClientAuth = wantAuth;
+    }
+
+    /**
+     * Binds this socket to the previously specified URI.
+     *
+     * Overridden to allow for proper handling of needClientAuth.
+     *
+     * @throws IOException passed up from TcpTransportServer.
+     */
+    @Override
+    public void bind() throws IOException {
+        super.bind();
+        if (needClientAuth) {
+            ((SSLServerSocket)this.serverSocket).setNeedClientAuth(true);
+        } else if (wantClientAuth) {
+            ((SSLServerSocket)this.serverSocket).setWantClientAuth(true);
+        }
+    }
+
+    /**
+     * Used to create Transports for this server.
+     *
+     * Overridden to allow the use of SslTransports (instead of TcpTransports).
+     *
+     * @param socket The incoming socket that will be wrapped into the new 
Transport.
+     * @param format The WireFormat being used.
+     * @return The newly return (SSL) Transport.
+     * @throws IOException
+     */
+    @Override
+    protected TcpTransport createTransport(Socket socket, WireFormat format) 
throws IOException {
+        return new SslTransport(format, (SSLSocket)socket, this.initBuffer);
+    }
+
+    @Override
+    public boolean isSslServer() {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java
new file mode 100644
index 0000000..5731b85
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.auto;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+
+/**
+ *
+ *
+ */
+public class AutoTcpTransportFactory extends TcpTransportFactory implements 
BrokerServiceAware {
+
+    protected BrokerService brokerService;
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+     */
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+
+    @Override
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, 
String>(URISupport.parseParameters(location));
+
+            Map<String, Object> autoProperties = 
IntrospectionSupport.extractProperties(options, "auto.");
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoProperties.get("protocols"));
+
+            ServerSocketFactory serverSocketFactory = 
createServerSocketFactory();
+            AutoTcpTransportServer server = createTcpTransportServer(location, 
serverSocketFactory);
+            //server.setWireFormatFactory(createWireFormatFactory(options));
+            server.setWireFormatFactory(new OpenWireFormatFactory());
+            if (options.get("allowLinkStealing") != null){
+                allowLinkStealingSet = true;
+            }
+            IntrospectionSupport.setProperties(server, options);
+            
server.setTransportOption(IntrospectionSupport.extractProperties(options, 
"transport."));
+            
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    boolean allowLinkStealingSet = false;
+    private Set<String> enabledProtocols;
+
+    @Override
+    protected AutoTcpTransportServer createTcpTransportServer(final URI 
location, ServerSocketFactory serverSocketFactory) throws IOException, 
URISyntaxException {
+        AutoTcpTransportServer server = new AutoTcpTransportServer(this, 
location, serverSocketFactory, brokerService, enabledProtocols) {
+
+            @Override
+            protected TcpTransport createTransport(Socket socket, WireFormat 
format)
+                    throws IOException {
+                if (format.getClass().toString().contains("MQTT") && 
!allowLinkStealingSet) {
+                    this.setAllowLinkStealing(true);
+                }
+                return super.createTransport(socket, format);
+            }
+
+        };
+
+        return server;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java
new file mode 100644
index 0000000..65a0fe5
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.auto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.transport.protocol.AmqpProtocolVerifier;
+import org.apache.activemq.broker.transport.protocol.MqttProtocolVerifier;
+import org.apache.activemq.broker.transport.protocol.OpenWireProtocolVerifier;
+import org.apache.activemq.broker.transport.protocol.ProtocolVerifier;
+import org.apache.activemq.broker.transport.protocol.StompProtocolVerifier;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.util.FactoryFinder;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.apache.activemq.wireformat.WireFormatFactory;
+import org.fusesource.hawtbuf.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A TCP based implementation of {@link TransportServer}
+ */
+public class AutoTcpTransportServer extends TcpTransportServer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AutoTcpTransportServer.class);
+
+    protected Map<String, Map<String, Object>> wireFormatOptions;
+    protected Map<String, Object> autoTransportOptions;
+    protected Set<String> enabledProtocols;
+    protected final Map<String, ProtocolVerifier> protocolVerifiers = new 
ConcurrentHashMap<String, ProtocolVerifier>();
+
+    protected BrokerService brokerService;
+
+    private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new 
FactoryFinder("META-INF/services/org/apache/activemq/transport/");
+    private final ConcurrentMap<String, TransportFactory> transportFactories = 
new ConcurrentHashMap<String, TransportFactory>();
+
+    private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new 
FactoryFinder("META-INF/services/org/apache/activemq/wireformat/");
+
+    public WireFormatFactory findWireFormatFactory(String scheme, Map<String, 
Map<String, Object>> options) throws IOException {
+        WireFormatFactory wff = null;
+        try {
+            wff = 
(WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(scheme);
+            if (options != null) {
+                IntrospectionSupport.setProperties(wff, 
options.get(AutoTransportUtils.ALL));
+                IntrospectionSupport.setProperties(wff, options.get(scheme));
+            }
+            if (wff instanceof OpenWireFormatFactory) {
+                protocolVerifiers.put(AutoTransportUtils.OPENWIRE, new 
OpenWireProtocolVerifier((OpenWireFormatFactory) wff));
+            }
+            return wff;
+        } catch (Throwable e) {
+           throw IOExceptionSupport.create("Could not create wire format 
factory for: " + scheme + ", reason: " + e, e);
+        }
+    }
+
+    public TransportFactory findTransportFactory(String scheme, Map<String, ?> 
options) throws IOException {
+        scheme = append(scheme, "nio");
+        scheme = append(scheme, "ssl");
+
+        if (scheme.isEmpty()) {
+            scheme = "tcp";
+        }
+
+        TransportFactory tf = transportFactories.get(scheme);
+        if (tf == null) {
+            // Try to load if from a META-INF property.
+            try {
+                tf = 
(TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
+                if (options != null)
+                    IntrospectionSupport.setProperties(tf, options);
+                transportFactories.put(scheme, tf);
+            } catch (Throwable e) {
+                throw IOExceptionSupport.create("Transport scheme NOT 
recognized: [" + scheme + "]", e);
+            }
+        }
+        return tf;
+    }
+
+    protected String append(String currentScheme, String scheme) {
+        if (this.getBindLocation().getScheme().contains(scheme)) {
+            if (!currentScheme.isEmpty()) {
+                currentScheme += "+";
+            }
+            currentScheme += scheme;
+        }
+        return currentScheme;
+    }
+
+    /**
+     * @param transportFactory
+     * @param location
+     * @param serverSocketFactory
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    public AutoTcpTransportServer(TcpTransportFactory transportFactory,
+            URI location, ServerSocketFactory serverSocketFactory, 
BrokerService brokerService,
+            Set<String> enabledProtocols)
+            throws IOException, URISyntaxException {
+        super(transportFactory, location, serverSocketFactory);
+        service = Executors.newCachedThreadPool();
+        this.brokerService = brokerService;
+        this.enabledProtocols = enabledProtocols;
+        initProtocolVerifiers();
+    }
+
+    @Override
+    public void setWireFormatFactory(WireFormatFactory factory) {
+        super.setWireFormatFactory(factory);
+        initOpenWireProtocolVerifier();
+    }
+
+    protected void initProtocolVerifiers() {
+        initOpenWireProtocolVerifier();
+
+        if (isAllProtocols() || 
enabledProtocols.contains(AutoTransportUtils.AMQP)) {
+            protocolVerifiers.put(AutoTransportUtils.AMQP, new 
AmqpProtocolVerifier());
+        }
+        if (isAllProtocols() || 
enabledProtocols.contains(AutoTransportUtils.STOMP)) {
+            protocolVerifiers.put(AutoTransportUtils.STOMP, new 
StompProtocolVerifier());
+        }
+        if (isAllProtocols()|| 
enabledProtocols.contains(AutoTransportUtils.MQTT)) {
+            protocolVerifiers.put(AutoTransportUtils.MQTT, new 
MqttProtocolVerifier());
+        }
+    }
+
+    protected void initOpenWireProtocolVerifier() {
+        if (isAllProtocols() || 
enabledProtocols.contains(AutoTransportUtils.OPENWIRE)) {
+            OpenWireProtocolVerifier owpv;
+            if (wireFormatFactory instanceof OpenWireFormatFactory) {
+                owpv = new OpenWireProtocolVerifier((OpenWireFormatFactory) 
wireFormatFactory);
+            } else {
+                owpv = new OpenWireProtocolVerifier(new 
OpenWireFormatFactory());
+            }
+            protocolVerifiers.put(AutoTransportUtils.OPENWIRE, owpv);
+        }
+    }
+
+    protected boolean isAllProtocols() {
+        return enabledProtocols == null || enabledProtocols.isEmpty();
+    }
+
+
+    protected final ExecutorService service;
+
+
+    /**
+     * This holds the initial buffer that has been read to detect the protocol.
+     */
+    public InitBuffer initBuffer;
+
+    @Override
+    protected void handleSocket(final Socket socket) {
+        final AutoTcpTransportServer server = this;
+
+        //This needs to be done in a new thread because
+        //the socket might be waiting on the client to send bytes
+        //doHandleSocket can't complete until the protocol can be detected
+        service.submit(new Runnable() {
+            @Override
+            public void run() {
+                server.doHandleSocket(socket);
+            }
+        });
+    }
+
+    @Override
+    protected TransportInfo configureTransport(final TcpTransportServer 
server, final Socket socket) throws Exception {
+         InputStream is = socket.getInputStream();
+
+         //We need to peak at the first 8 bytes of the buffer to detect the 
protocol
+         Buffer magic = new Buffer(8);
+         magic.readFrom(is);
+
+         ProtocolInfo protocolInfo = detectProtocol(magic.getData());
+
+         initBuffer = new InitBuffer(8, ByteBuffer.allocate(8));
+         initBuffer.buffer.put(magic.getData());
+
+         if (protocolInfo.detectedTransportFactory instanceof 
BrokerServiceAware) {
+             ((BrokerServiceAware) 
protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
+         }
+
+        WireFormat format = 
protocolInfo.detectedWireFormatFactory.createWireFormat();
+        Transport transport = createTransport(socket, format, 
protocolInfo.detectedTransportFactory);
+
+        return new TransportInfo(format, transport, 
protocolInfo.detectedTransportFactory);
+    }
+
+    @Override
+    protected TcpTransport createTransport(Socket socket, WireFormat format) 
throws IOException {
+        return new TcpTransport(format, socket, this.initBuffer);
+    }
+
+    /**
+     * @param socket
+     * @param format
+     * @param detectedTransportFactory
+     * @return
+     */
+    protected TcpTransport createTransport(Socket socket, WireFormat format,
+            TcpTransportFactory detectedTransportFactory) throws IOException {
+        return createTransport(socket, format);
+    }
+
+    public void setWireFormatOptions(Map<String, Map<String, Object>> 
wireFormatOptions) {
+        this.wireFormatOptions = wireFormatOptions;
+    }
+
+    public void setEnabledProtocols(Set<String> enabledProtocols) {
+        this.enabledProtocols = enabledProtocols;
+    }
+
+    public void setAutoTransportOptions(Map<String, Object> 
autoTransportOptions) {
+        this.autoTransportOptions = autoTransportOptions;
+        if (autoTransportOptions.get("protocols") != null)
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoTransportOptions.get("protocols"));
+    }
+
+    protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException {
+        TcpTransportFactory detectedTransportFactory = transportFactory;
+        WireFormatFactory detectedWireFormatFactory = wireFormatFactory;
+
+        boolean found = false;
+        for (String scheme : protocolVerifiers.keySet()) {
+            if (protocolVerifiers.get(scheme).isProtocol(buffer)) {
+                LOG.debug("Detected " + scheme);
+                detectedWireFormatFactory = findWireFormatFactory(scheme, 
wireFormatOptions);
+
+                if (scheme.equals("default")) {
+                    scheme = "";
+                }
+
+                detectedTransportFactory = (TcpTransportFactory) 
findTransportFactory(scheme, transportOptions);
+                found = true;
+                break;
+            }
+        }
+
+        if (!found) {
+            throw new IllegalStateException("Could not detect wire format");
+        }
+
+        return new ProtocolInfo(detectedTransportFactory, 
detectedWireFormatFactory);
+
+    }
+
+    protected class ProtocolInfo {
+        public final TcpTransportFactory detectedTransportFactory;
+        public final WireFormatFactory detectedWireFormatFactory;
+
+        public ProtocolInfo(TcpTransportFactory detectedTransportFactory,
+                WireFormatFactory detectedWireFormatFactory) {
+            super();
+            this.detectedTransportFactory = detectedTransportFactory;
+            this.detectedWireFormatFactory = detectedWireFormatFactory;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java
new file mode 100644
index 0000000..453292d
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.auto;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.activemq.util.IntrospectionSupport;
+
+/**
+ *
+ *
+ */
+public class AutoTransportUtils {
+
+    //wireformats
+    public static String ALL = "all";
+    public static String OPENWIRE = "default";
+    public static String STOMP = "stomp";
+    public static String AMQP = "amqp";
+    public static String MQTT = "mqtt";
+
+    //transports
+    public static String AUTO = "auto";
+
+    public static Map<String, Map<String, Object>> 
extractWireFormatOptions(Map<String, String> options ) {
+        Map<String, Map<String, Object>> wireFormatOptions = new HashMap<>();
+        if (options != null) {
+            wireFormatOptions.put(OPENWIRE, 
IntrospectionSupport.extractProperties(options, "wireFormat.default."));
+            wireFormatOptions.put(STOMP, 
IntrospectionSupport.extractProperties(options, "wireFormat.stomp."));
+            wireFormatOptions.put(AMQP, 
IntrospectionSupport.extractProperties(options, "wireFormat.amqp."));
+            wireFormatOptions.put(MQTT, 
IntrospectionSupport.extractProperties(options, "wireFormat.mqtt."));
+            wireFormatOptions.put(ALL, 
IntrospectionSupport.extractProperties(options, "wireFormat."));
+        }
+        return wireFormatOptions;
+    }
+
+    public static Set<String> parseProtocols(String protocolString) {
+        Set<String> protocolSet = new HashSet<>();;
+        if (protocolString != null && !protocolString.isEmpty()) {
+            protocolSet.addAll(Arrays.asList(protocolString.split(",")));
+        }
+        return protocolSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java
new file mode 100644
index 0000000..7ac3a97
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java
@@ -0,0 +1,122 @@
+package org.apache.activemq.broker.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.nio.AutoInitNioSSLTransport;
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransportServer;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AutoNIOSSLTransportServer extends AutoTcpTransportServer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AutoNIOSSLTransportServer.class);
+
+    private SSLContext context;
+
+    public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory 
transportFactory, URI location, ServerSocketFactory serverSocketFactory,
+            BrokerService brokerService, Set<String> enabledProtocols) throws 
IOException, URISyntaxException {
+        super(transportFactory, location, serverSocketFactory, brokerService, 
enabledProtocols);
+
+        this.context = context;
+    }
+
+    private boolean needClientAuth;
+    private boolean wantClientAuth;
+
+    protected Transport createTransport(Socket socket, WireFormat format, 
SSLEngine engine,
+            InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory 
detectedFactory) throws IOException {
+        NIOSSLTransport transport = new NIOSSLTransport(format, socket, 
engine, initBuffer, inputBuffer);
+        if (context != null) {
+            transport.setSslContext(context);
+        }
+
+        transport.setNeedClientAuth(needClientAuth);
+        transport.setWantClientAuth(wantClientAuth);
+
+
+        return transport;
+    }
+
+    @Override
+    protected TcpTransport createTransport(Socket socket, WireFormat format) 
throws IOException {
+        throw new UnsupportedOperationException("method not supported");
+    }
+
+    @Override
+    public boolean isSslServer() {
+        return true;
+    }
+
+    public boolean isNeedClientAuth() {
+        return this.needClientAuth;
+    }
+
+    public void setNeedClientAuth(boolean value) {
+        this.needClientAuth = value;
+    }
+
+    public boolean isWantClientAuth() {
+        return this.wantClientAuth;
+    }
+
+    public void setWantClientAuth(boolean value) {
+        this.wantClientAuth = value;
+    }
+
+
+    @Override
+    protected TransportInfo configureTransport(final TcpTransportServer 
server, final Socket socket) throws Exception {
+
+        //The SSLEngine needs to be initialized and handshake done to get the 
first command and detect the format
+        AutoInitNioSSLTransport in = new 
AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket);
+        if (context != null) {
+            in.setSslContext(context);
+        }
+        in.start();
+        SSLEngine engine = in.getSslSession();
+
+        //Wait for handshake to finish initializing
+        byte[] read = null;
+        do {
+            in.serviceRead();
+        } while((read = in.read) == null);
+
+        in.stop();
+
+        initBuffer = new InitBuffer(in.readSize, 
ByteBuffer.allocate(read.length));
+        initBuffer.buffer.put(read);
+
+        ProtocolInfo protocolInfo = detectProtocol(read);
+
+        if (protocolInfo.detectedTransportFactory instanceof 
BrokerServiceAware) {
+            ((BrokerServiceAware) 
protocolInfo.detectedTransportFactory).setBrokerService(brokerService);
+        }
+
+        WireFormat format = 
protocolInfo.detectedWireFormatFactory.createWireFormat();
+        Transport transport = createTransport(socket, format, engine, 
initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory);
+
+        return new TransportInfo(format, transport, 
protocolInfo.detectedTransportFactory);
+    }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java
new file mode 100644
index 0000000..e1b6e71
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *
+ *
+ */
+public class AutoNIOTransport extends NIOTransport {
+
+    public AutoNIOTransport(WireFormat format, Socket socket,
+            InitBuffer initBuffer) throws IOException {
+        super(format, socket, initBuffer);
+    }
+
+    public AutoNIOTransport(WireFormat wireFormat, Socket socket)
+            throws IOException {
+        super(wireFormat, socket);
+    }
+
+    public AutoNIOTransport(WireFormat wireFormat, SocketFactory socketFactory,
+            URI remoteLocation, URI localLocation) throws UnknownHostException,
+            IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+
+    boolean doneInitBuffer = false;
+
+    /**
+     * Read from the initial buffer if it is set
+     */
+    @Override
+    protected int readFromBuffer() throws IOException {
+        int readSize = 0;
+        if (!doneInitBuffer) {
+            if (initBuffer == null) {
+                throw new IOException("Null initBuffer");
+            }
+            if (nextFrameSize == -1) {
+                readSize = 4;
+                this.initBuffer.buffer.flip();
+                for (int i = 0; i < 4; i++) {
+                    currentBuffer.put(initBuffer.buffer.get());
+                }
+            } else {
+                for (int i = 0; i < 4; i++) {
+                    currentBuffer.put(initBuffer.buffer.get());
+                }
+                readSize = 4;
+                doneInitBuffer = true;
+            }
+
+        } else {
+            readSize += channel.read(currentBuffer);
+        }
+        return readSize;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java
new file mode 100644
index 0000000..8142a2a
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.net.ServerSocketFactory;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
+import org.apache.activemq.broker.transport.auto.AutoTransportUtils;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.nio.NIOSSLTransport;
+import org.apache.activemq.transport.nio.NIOSSLTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *
+ *
+ */
+public class AutoNioSslTransportFactory extends NIOSSLTransportFactory 
implements BrokerServiceAware {
+    protected BrokerService brokerService;
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+     */
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    protected AutoNIOSSLTransportServer createTcpTransportServer(URI location, 
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException 
{
+        return new AutoNIOSSLTransportServer(context, this, location, 
serverSocketFactory, brokerService, enabledProtocols) {
+
+            @Override
+            protected Transport createTransport(Socket socket, WireFormat 
format, SSLEngine engine, InitBuffer initBuffer,
+                     ByteBuffer inputBuffer, TcpTransportFactory 
detectedFactory) throws IOException {
+                NIOSSLTransport nioSslTransport = (NIOSSLTransport) 
detectedFactory.createTransport(
+                        format, socket, engine, initBuffer, inputBuffer);
+
+                if (format.getClass().toString().contains("MQTT")) {
+                    if (!allowLinkStealingSet) {
+                        this.setAllowLinkStealing(true);
+                    }
+                }
+
+                if (context != null) {
+                    nioSslTransport.setSslContext(context);
+                }
+
+                nioSslTransport.setNeedClientAuth(isNeedClientAuth());
+                nioSslTransport.setWantClientAuth(isWantClientAuth());
+
+                return nioSslTransport;
+            }
+
+        };
+
+    }
+
+    boolean allowLinkStealingSet = false;
+    private Set<String> enabledProtocols;
+
+    @Override
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            if (SslContext.getCurrentSslContext() != null) {
+                try {
+                    context = 
SslContext.getCurrentSslContext().getSSLContext();
+                } catch (Exception e) {
+                    throw new IOException(e);
+                }
+            }
+
+            Map<String, String> options = new HashMap<String, 
String>(URISupport.parseParameters(location));
+
+            Map<String, Object> autoProperties = 
IntrospectionSupport.extractProperties(options, "auto.");
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoProperties.get("protocols"));
+
+            ServerSocketFactory serverSocketFactory = 
createServerSocketFactory();
+            AutoTcpTransportServer server = createTcpTransportServer(location, 
serverSocketFactory);
+            server.setWireFormatFactory(new OpenWireFormatFactory());
+            if (options.get("allowLinkStealing") != null){
+                allowLinkStealingSet = true;
+            }
+            IntrospectionSupport.setProperties(server, options);
+            
server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, 
"auto."));
+            
server.setTransportOption(IntrospectionSupport.extractProperties(options, 
"transport."));
+            
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java
new file mode 100644
index 0000000..0922ae6
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.auto.nio;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer;
+import org.apache.activemq.broker.transport.auto.AutoTransportUtils;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.nio.NIOTransport;
+import org.apache.activemq.transport.nio.NIOTransportFactory;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransportFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ *
+ *
+ */
+public class AutoNioTransportFactory extends NIOTransportFactory implements 
BrokerServiceAware {
+    protected BrokerService brokerService;
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService)
+     */
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    @Override
+    protected AutoTcpTransportServer createTcpTransportServer(URI location, 
ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException 
{
+        return new AutoTcpTransportServer(this, location, serverSocketFactory, 
brokerService, enabledProtocols) {
+            @Override
+            protected TcpTransport createTransport(Socket socket, WireFormat 
format, TcpTransportFactory detectedTransportFactory) throws IOException {
+                TcpTransport nioTransport = null;
+                if 
(detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) {
+                    nioTransport = new AutoNIOTransport(format, 
socket,this.initBuffer);
+                } else {
+                    nioTransport = detectedTransportFactory.createTransport(
+                            format, socket, this.initBuffer);
+                }
+
+                if (format.getClass().toString().contains("MQTT")) {
+                    if (!allowLinkStealingSet) {
+                        this.setAllowLinkStealing(true);
+                    }
+                }
+
+                return nioTransport;
+            }
+        };
+
+    }
+
+    boolean allowLinkStealingSet = false;
+    private Set<String> enabledProtocols;
+
+    @Override
+    public TransportServer doBind(final URI location) throws IOException {
+        try {
+            Map<String, String> options = new HashMap<String, 
String>(URISupport.parseParameters(location));
+
+            Map<String, Object> autoProperties = 
IntrospectionSupport.extractProperties(options, "auto.");
+            this.enabledProtocols = AutoTransportUtils.parseProtocols((String) 
autoProperties.get("protocols"));
+
+            ServerSocketFactory serverSocketFactory = 
createServerSocketFactory();
+            AutoTcpTransportServer server = createTcpTransportServer(location, 
serverSocketFactory);
+            //server.setWireFormatFactory(createWireFormatFactory(options));
+            server.setWireFormatFactory(new OpenWireFormatFactory());
+            if (options.get("allowLinkStealing") != null){
+                allowLinkStealingSet = true;
+            }
+            IntrospectionSupport.setProperties(server, options);
+            
server.setTransportOption(IntrospectionSupport.extractProperties(options, 
"transport."));
+            
server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options));
+            server.bind();
+
+            return server;
+        } catch (URISyntaxException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java
new file mode 100644
index 0000000..11f0574
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.protocol;
+
+
+/**
+ *
+ *
+ */
+public class AmqpProtocolVerifier implements ProtocolVerifier {
+
+    static final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' };
+
+    @Override
+    public boolean isProtocol(byte[] value) {
+        for (int i = 0; i < PREFIX.length; i++) {
+            if (value[i] != PREFIX[i])
+                return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java
new file mode 100644
index 0000000..4b4edd2
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.protocol;
+
+/**
+ *
+ *
+ */
+public class MqttProtocolVerifier implements ProtocolVerifier {
+
+    /* (non-Javadoc)
+     * @see 
org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+     */
+    @Override
+    public boolean isProtocol(byte[] value) {
+        boolean mqtt311 = value[4] == 77 && // M
+                value[5] == 81 && // Q
+                value[6] == 84 && // T
+                value[7] == 84;   // T
+
+        boolean mqtt31  = value[4] == 77  && // M
+                        value[5] == 81  && // Q
+                        value[6] == 73  && // I
+                        value[7] == 115;   // s
+
+        return mqtt311 || mqtt31;
+    }
+
+
+
+}

Reply via email to