https://issues.apache.org/jira/browse/AMQ-5889
Adding support for auto detection of wire protocols over a transport. OpenWire, AMQP, STOMP, and MQTT can all be detected and the broker will properly handle each one over a given Transport. Currently auto TCP, NIO, SSL, and NIO+SSL transports can handle auto-detection of the wire format and client but support could be added in the future for other transports like websockets. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/04ee70a1 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/04ee70a1 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/04ee70a1 Branch: refs/heads/master Commit: 04ee70a161b463f69692debea623d754a5c781c2 Parents: 9f50ce3 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Thu Jul 16 18:23:24 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Aug 11 19:39:29 2015 +0000 ---------------------------------------------------------------------- activemq-amqp/pom.xml | 21 ++ .../transport/amqp/AmqpNioSslTransport.java | 43 ++- .../amqp/AmqpNioSslTransportFactory.java | 10 + .../transport/amqp/AmqpNioTransport.java | 33 +- .../transport/amqp/AmqpNioTransportFactory.java | 7 + .../transport/amqp/AmqpTestSupport.java | 53 ++++ .../amqp/auto/JMSClientAutoNioPlusSslTest.java | 51 ++++ .../amqp/auto/JMSClientAutoNioTest.java | 51 ++++ .../amqp/auto/JMSClientAutoPlusSslTest.java | 51 ++++ .../transport/amqp/auto/JMSClientAutoTest.java | 50 ++++ activemq-broker/pom.xml | 21 ++ .../activemq/broker/TransportConnection.java | 1 + .../transport/auto/AutoSslTransportFactory.java | 117 ++++++++ .../transport/auto/AutoSslTransportServer.java | 149 ++++++++++ .../transport/auto/AutoTcpTransportFactory.java | 110 +++++++ .../transport/auto/AutoTcpTransportServer.java | 298 +++++++++++++++++++ .../transport/auto/AutoTransportUtils.java | 62 ++++ .../auto/nio/AutoNIOSSLTransportServer.java | 122 ++++++++ .../transport/auto/nio/AutoNIOTransport.java | 85 ++++++ .../auto/nio/AutoNioSslTransportFactory.java | 131 ++++++++ .../auto/nio/AutoNioTransportFactory.java | 114 +++++++ .../protocol/AmqpProtocolVerifier.java | 36 +++ .../protocol/MqttProtocolVerifier.java | 45 +++ .../protocol/OpenWireProtocolVerifier.java | 67 +++++ .../transport/protocol/ProtocolVerifier.java | 9 + .../protocol/StompProtocolVerifier.java | 39 +++ .../transport/nio/AutoInitNioSSLTransport.java | 234 +++++++++++++++ .../services/org/apache/activemq/transport/auto | 17 ++ .../org/apache/activemq/transport/auto+nio | 17 ++ .../org/apache/activemq/transport/auto+nio+ssl | 17 ++ .../org/apache/activemq/transport/auto+ssl | 17 ++ activemq-camel/pom.xml | 22 +- activemq-client/pom.xml | 21 ++ .../activemq/ActiveMQConnectionFactory.java | 18 +- .../activemq/transport/TransportFactory.java | 7 + .../activemq/transport/nio/NIOSSLTransport.java | 109 +++++-- .../transport/nio/NIOSSLTransportFactory.java | 16 + .../transport/nio/NIOSSLTransportServer.java | 2 +- .../activemq/transport/nio/NIOTransport.java | 16 +- .../transport/nio/NIOTransportFactory.java | 20 ++ .../activemq/transport/tcp/SslTransport.java | 22 +- .../transport/tcp/TcpBufferedInputStream.java | 24 +- .../activemq/transport/tcp/TcpTransport.java | 65 +++- .../transport/tcp/TcpTransportFactory.java | 13 + .../transport/tcp/TcpTransportServer.java | 44 ++- activemq-http/pom.xml | 22 +- activemq-jaas/pom.xml | 21 ++ activemq-jms-pool/pom.xml | 21 ++ activemq-kahadb-store/pom.xml | 23 +- activemq-karaf-itest/pom.xml | 21 ++ activemq-leveldb-store/pom.xml | 21 ++ activemq-mqtt/pom.xml | 21 ++ .../transport/mqtt/MQTTNIOSSLTransport.java | 23 +- .../mqtt/MQTTNIOSSLTransportFactory.java | 12 + .../transport/mqtt/MQTTNIOTransport.java | 34 ++- .../transport/mqtt/MQTTNIOTransportFactory.java | 7 + .../transport/mqtt/auto/MQTTAutoNioSslTest.java | 35 +++ .../transport/mqtt/auto/MQTTAutoNioTest.java | 35 +++ .../transport/mqtt/auto/MQTTAutoSslTest.java | 35 +++ .../transport/mqtt/auto/MQTTAutoTest.java | 35 +++ activemq-partition/pom.xml | 21 ++ activemq-pool/pom.xml | 24 ++ activemq-ra/pom.xml | 21 ++ activemq-runtime-config/pom.xml | 21 ++ activemq-spring/pom.xml | 21 ++ activemq-stomp/pom.xml | 21 ++ .../transport/stomp/StompNIOSSLTransport.java | 24 +- .../stomp/StompNIOSSLTransportFactory.java | 10 + .../transport/stomp/StompNIOTransport.java | 38 ++- .../stomp/StompNIOTransportFactory.java | 13 + .../transport/stomp/StompTestSupport.java | 56 +++- .../stomp/auto/StompAutoNioSslTest.java | 44 +++ .../transport/stomp/auto/StompAutoNioTest.java | 40 +++ .../transport/stomp/auto/StompAutoSslTest.java | 44 +++ .../transport/stomp/auto/StompAutoTest.java | 40 +++ activemq-unit-tests/pom.xml | 21 ++ .../auto/AutoNIOSslTransportBrokerTest.java | 67 +++++ .../auto/AutoNIOTransportBrokerTest.java | 38 +++ .../auto/AutoSslTransportBrokerTest.java | 64 ++++ .../transport/auto/AutoTransportBrokerTest.java | 38 +++ .../auto/AutoTransportConfigureTest.java | 156 ++++++++++ .../auto/failover/AutoFailoverClusterTest.java | 27 ++ .../auto/failover/AutoFailoverTimeoutTest.java | 27 ++ .../auto/failover/FailoverAutoRandomTest.java | 28 ++ .../FailoverAutoTransportBrokerTest.java | 43 +++ .../AutoNIOJmsDurableTopicSendReceiveTest.java | 29 ++ .../auto/nio/AutoNIOJmsSendAndReceiveTest.java | 32 ++ .../AutoNIOPersistentSendAndReceiveTest.java | 27 ++ .../transport/auto/nio/AutoNIOSSLBasicTest.java | 27 ++ .../transport/failover/FailoverClusterTest.java | 12 +- .../transport/failover/FailoverRandomTest.java | 22 +- .../transport/failover/FailoverTimeoutTest.java | 4 + .../activemq/transport/nio/NIOSSLBasicTest.java | 10 +- .../nio/NIOSSLTransportBrokerTest.java | 4 +- activemq-web-demo/pom.xml | 21 ++ activemq-web/pom.xml | 21 ++ assembly/pom.xml | 21 ++ 97 files changed, 3912 insertions(+), 108 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml index 4d3c8ef..5f10825 100644 --- a/activemq-amqp/pom.xml +++ b/activemq-amqp/pom.xml @@ -205,6 +205,27 @@ </plugins> </build> </profile> + <profile> + <id>activemq.tests-autoTransport</id> + <activation> + <property> + <name>activemq.tests</name> + <value>autoTransport</value> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <includes> + <include>**/auto/*Test.java</include> + </includes> + </configuration> + </plugin> + </plugins> + </build> + </profile> <profile> <id>activemq.tests.windows.excludes</id> http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java index 3276be9..88d9284 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransport.java @@ -23,6 +23,7 @@ import java.net.UnknownHostException; import java.nio.ByteBuffer; import javax.net.SocketFactory; +import javax.net.ssl.SSLEngine; import org.apache.activemq.transport.nio.NIOSSLTransport; import org.apache.activemq.wireformat.WireFormat; @@ -38,7 +39,14 @@ public class AmqpNioSslTransport extends NIOSSLTransport { } public AmqpNioSslTransport(WireFormat wireFormat, Socket socket) throws IOException { - super(wireFormat, socket); + super(wireFormat, socket, null, null, null); + + frameReader.setWireFormat((AmqpWireFormat) wireFormat); + } + + public AmqpNioSslTransport(WireFormat wireFormat, Socket socket, + SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException { + super(wireFormat, socket, engine, initBuffer, inputBuffer); frameReader.setWireFormat((AmqpWireFormat) wireFormat); } @@ -55,4 +63,37 @@ public class AmqpNioSslTransport extends NIOSSLTransport { protected void processCommand(ByteBuffer plain) throws Exception { frameReader.parse(plain); } + + /* (non-Javadoc) + * @see org.apache.activemq.transport.nio.NIOSSLTransport#secureRead(java.nio.ByteBuffer) + */ + + @Override + protected void doInit() { + if (initBuffer != null) { + nextFrameSize = -1; + serviceRead(); + + } + } + + @Override + protected int secureRead(ByteBuffer plain) throws Exception { + if (initBuffer != null) { + initBuffer.buffer.flip(); + if (initBuffer.buffer.hasRemaining()) { + plain.flip(); + for (int i =0; i < 8; i++) { + plain.put(initBuffer.buffer.get()); + } + plain.flip(); + processCommand(plain); + initBuffer.buffer.clear(); + return 8; + } + } + return super.secureRead(plain); + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java index 5e2fa06..7858a56 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioSslTransportFactory.java @@ -21,15 +21,18 @@ import java.net.Socket; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.nio.ByteBuffer; import javax.net.ServerSocketFactory; import javax.net.SocketFactory; import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; import org.apache.activemq.broker.SslContext; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.wireformat.WireFormat; @@ -62,6 +65,13 @@ public class AmqpNioSslTransportFactory extends AmqpNioTransportFactory { } @Override + public TcpTransport createTransport(WireFormat wireFormat, Socket socket, + SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) + throws IOException { + return new AmqpNioSslTransport(wireFormat, socket, engine, initBuffer, inputBuffer); + } + + @Override public TransportServer doBind(URI location) throws IOException { if (SslContext.getCurrentSslContext() != null) { try { http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java index 21d40eb..dfa1569 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp; +import java.io.ByteArrayInputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; @@ -63,6 +64,12 @@ public class AmqpNioTransport extends TcpTransport { frameReader.setWireFormat((AmqpWireFormat) wireFormat); } + public AmqpNioTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException { + super(wireFormat, socket, initBuffer); + + frameReader.setWireFormat((AmqpWireFormat) wireFormat); + } + @Override protected void initializeStreams() throws IOException { channel = socket.getChannel(); @@ -91,6 +98,17 @@ public class AmqpNioTransport extends TcpTransport { NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024); this.dataOut = new DataOutputStream(outPutStream); this.buffOut = outPutStream; + + try { + if (initBuffer != null) { + processBuffer(initBuffer.buffer, initBuffer.readSize); + } + } catch (IOException e) { + onException(e); + } catch (Throwable e) { + onException(IOExceptionSupport.create(e)); + } + } boolean magicRead = false; @@ -101,6 +119,7 @@ public class AmqpNioTransport extends TcpTransport { while (isStarted()) { // read channel int readSize = channel.read(inputBuffer); + // channel is closed, cleanup if (readSize == -1) { onException(new EOFException()); @@ -112,11 +131,7 @@ public class AmqpNioTransport extends TcpTransport { break; } - receiveCounter += readSize; - - inputBuffer.flip(); - frameReader.parse(inputBuffer); - inputBuffer.clear(); + processBuffer(inputBuffer, readSize); } } catch (IOException e) { onException(e); @@ -125,6 +140,14 @@ public class AmqpNioTransport extends TcpTransport { } } + protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception { + receiveCounter += readSize; + + buffer.flip(); + frameReader.parse(buffer); + buffer.clear(); + } + @Override protected void doStart() throws Exception { connect(); http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java index d54d58f..4b6d9bd 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransportFactory.java @@ -33,6 +33,7 @@ import org.apache.activemq.transport.MutexTransport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.nio.NIOTransportFactory; import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; import org.apache.activemq.transport.tcp.TcpTransportServer; import org.apache.activemq.util.IntrospectionSupport; import org.apache.activemq.wireformat.WireFormat; @@ -64,6 +65,12 @@ public class AmqpNioTransportFactory extends NIOTransportFactory implements Brok return new AmqpNioTransport(wf, socketFactory, location, localLocation); } + @Override + public TcpTransport createTransport(WireFormat wireFormat, Socket socket, + InitBuffer initBuffer) throws IOException { + return new AmqpNioTransport(wireFormat, socket, initBuffer); + } + @SuppressWarnings("rawtypes") @Override public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception { http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 91909d4..45af469 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -79,6 +79,15 @@ public class AmqpTestSupport { protected URI amqpNioPlusSslURI; protected int amqpNioPlusSslPort; + protected URI autoURI; + protected int autoPort; + protected URI autoSslURI; + protected int autoSslPort; + protected URI autoNioURI; + protected int autoNioPort; + protected URI autoNioPlusSslURI; + protected int autoNioPlusSslPort; + protected URI openwireURI; protected int openwirePort; @@ -176,6 +185,34 @@ public class AmqpTestSupport { amqpNioPlusSslURI = connector.getPublishableConnectURI(); LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort); } + if (isUseAutoConnector()) { + connector = brokerService.addConnector( + "auto://0.0.0.0:" + autoPort + getAdditionalConfig()); + autoPort = connector.getConnectUri().getPort(); + autoURI = connector.getPublishableConnectURI(); + LOG.debug("Using auto port " + autoPort); + } + if (isUseAutoSslConnector()) { + connector = brokerService.addConnector( + "auto+ssl://0.0.0.0:" + autoSslPort + getAdditionalConfig()); + autoSslPort = connector.getConnectUri().getPort(); + autoSslURI = connector.getPublishableConnectURI(); + LOG.debug("Using auto+ssl port " + autoSslPort); + } + if (isUseAutoNioConnector()) { + connector = brokerService.addConnector( + "auto+nio://0.0.0.0:" + autoNioPort + getAdditionalConfig()); + autoNioPort = connector.getConnectUri().getPort(); + autoNioURI = connector.getPublishableConnectURI(); + LOG.debug("Using auto+nio port " + autoNioPort); + } + if (isUseAutoNioPlusSslConnector()) { + connector = brokerService.addConnector( + "auto+nio+ssl://0.0.0.0:" + autoNioPlusSslPort + getAdditionalConfig()); + autoNioPlusSslPort = connector.getConnectUri().getPort(); + autoNioPlusSslURI = connector.getPublishableConnectURI(); + LOG.debug("Using auto+nio+ssl port " + autoNioPlusSslPort); + } } protected boolean isPersistent() { @@ -206,6 +243,22 @@ public class AmqpTestSupport { return false; } + protected boolean isUseAutoConnector() { + return false; + } + + protected boolean isUseAutoSslConnector() { + return false; + } + + protected boolean isUseAutoNioConnector() { + return false; + } + + protected boolean isUseAutoNioPlusSslConnector() { + return false; + } + protected String getAmqpTransformer() { return "jms"; } http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java new file mode 100644 index 0000000..0bf2e38 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioPlusSslTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.auto; + +import java.net.URI; + +import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest; +import org.apache.activemq.transport.amqp.JMSClientSslTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the JMS client when connected to the NIO+SSL transport. + */ +public class JMSClientAutoNioPlusSslTest extends JMSClientSslTest { + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoNioPlusSslTest.class); + + @Override + protected URI getBrokerURI() { + return autoNioPlusSslURI; + } + + @Override + protected boolean isUseTcpConnector() { + return false; + } + + @Override + protected boolean isUseAutoNioPlusSslConnector() { + return true; + } + + @Override + protected String getTargetConnectorName() { + return "auto+nio+ssl"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java new file mode 100644 index 0000000..5346a79 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoNioTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.auto; + +import java.net.URI; + +import org.apache.activemq.transport.amqp.JMSClientNioTest; +import org.apache.activemq.transport.amqp.JMSClientTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the JMS client when connected to the NIO transport. + */ +public class JMSClientAutoNioTest extends JMSClientTest { + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoNioTest.class); + + @Override + protected URI getBrokerURI() { + return autoNioURI; + } + + @Override + protected boolean isUseTcpConnector() { + return false; + } + + @Override + protected boolean isUseAutoNioConnector() { + return true; + } + + @Override + protected String getTargetConnectorName() { + return "auto+nio"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java new file mode 100644 index 0000000..deb90fb --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoPlusSslTest.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.auto; + +import java.net.URI; + +import org.apache.activemq.transport.amqp.JMSClientNioPlusSslTest; +import org.apache.activemq.transport.amqp.JMSClientSslTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the JMS client when connected to the NIO+SSL transport. + */ +public class JMSClientAutoPlusSslTest extends JMSClientSslTest { + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoPlusSslTest.class); + + @Override + protected URI getBrokerURI() { + return autoSslURI; + } + + @Override + protected boolean isUseTcpConnector() { + return false; + } + + @Override + protected boolean isUseAutoSslConnector() { + return true; + } + + @Override + protected String getTargetConnectorName() { + return "auto+ssl"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java new file mode 100644 index 0000000..77683e4 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/auto/JMSClientAutoTest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.auto; + +import java.net.URI; + +import org.apache.activemq.transport.amqp.JMSClientTest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the JMS client when connected to the NIO transport. + */ +public class JMSClientAutoTest extends JMSClientTest { + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientAutoTest.class); + + @Override + protected boolean isUseTcpConnector() { + return false; + } + + @Override + protected boolean isUseAutoConnector() { + return true; + } + + @Override + protected URI getBrokerURI() { + return autoURI; + } + + @Override + protected String getTargetConnectorName() { + return "auto"; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-broker/pom.xml b/activemq-broker/pom.xml index bc35400..5f8545b 100755 --- a/activemq-broker/pom.xml +++ b/activemq-broker/pom.xml @@ -238,5 +238,26 @@ </plugins> </build> </profile> + <profile> + <id>activemq.tests-autoTransport</id> + <activation> + <property> + <name>activemq.tests</name> + <value>autoTransport</value> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <excludes> + <exclude>**</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 2727503..a9d36b5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -239,6 +239,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { } if (!stopping.get() && !pendingStop) { transportException.set(e); + e.printStackTrace(); if (TRANSPORTLOG.isDebugEnabled()) { TRANSPORTLOG.debug(this + " failed: " + e, e); } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) { http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java new file mode 100644 index 0000000..7c65311 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportFactory.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.auto; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.SslTransportFactory; +import org.apache.activemq.transport.tcp.SslTransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AutoSslTransportFactory extends SslTransportFactory implements BrokerServiceAware { + private static final Logger LOG = LoggerFactory.getLogger(AutoSslTransportFactory.class); + + + protected BrokerService brokerService; + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + private Set<String> enabledProtocols; + + /** + * Overriding to use SslTransportServer and allow for proper reflection. + */ + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoSslTransportServer server = createAutoSslTransportServer(location, (SSLServerSocketFactory)serverSocketFactory); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto.")); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + boolean allowLinkStealingSet = false; + + /** + * Allows subclasses of SslTransportFactory to create custom instances of + * SslTransportServer. + * + * @param location + * @param serverSocketFactory + * @return + * @throws IOException + * @throws URISyntaxException + */ + // @Override + protected AutoSslTransportServer createAutoSslTransportServer(final URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + AutoSslTransportServer server = new AutoSslTransportServer(this, location, serverSocketFactory, + this.brokerService, enabledProtocols) { + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) + throws IOException { + if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + return super.createTransport(socket, format); + } + }; + return server; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java new file mode 100644 index 0000000..9954523 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoSslTransportServer.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker.transport.auto; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Set; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSocket; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.tcp.SslTransport; +import org.apache.activemq.transport.tcp.SslTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.wireformat.WireFormat; + +/** + * An SSL TransportServer. + * + * Allows for client certificate authentication (refer to setNeedClientAuth for + * details). + * NOTE: Client certificate authentication is disabled by default. + * + */ +public class AutoSslTransportServer extends AutoTcpTransportServer { + + + + // Specifies if sockets created from this server should needClientAuth. + private boolean needClientAuth; + + // Specifies if sockets created from this server should wantClientAuth. + private boolean wantClientAuth; + +// /** +// * Creates a ssl transport server for the specified url using the provided +// * serverSocketFactory +// * +// * @param transportFactory The factory used to create transports when connections arrive. +// * @param location The location of the broker to bind to. +// * @param serverSocketFactory The factory used to create this server. +// * @throws IOException passed up from TcpTransportFactory. +// * @throws URISyntaxException passed up from TcpTransportFactory. +// */ +// public SslTransportServer(SslTransportFactory transportFactory, URI location, SSLServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { +// super(transportFactory, location, serverSocketFactory); +// } + + public AutoSslTransportServer(SslTransportFactory transportFactory, + URI location, SSLServerSocketFactory serverSocketFactory, + BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); + // TODO Auto-generated constructor stub + } + + /** + * Sets whether client authentication should be required + * Must be called before {@link #bind()} + * Note: Calling this method clears the wantClientAuth flag + * in the underlying implementation. + */ + public void setNeedClientAuth(boolean needAuth) { + this.needClientAuth = needAuth; + } + + /** + * Returns whether client authentication should be required. + */ + public boolean getNeedClientAuth() { + return this.needClientAuth; + } + + /** + * Returns whether client authentication should be requested. + */ + public boolean getWantClientAuth() { + return this.wantClientAuth; + } + + /** + * Sets whether client authentication should be requested. + * Must be called before {@link #bind()} + * Note: Calling this method clears the needClientAuth flag + * in the underlying implementation. + */ + public void setWantClientAuth(boolean wantAuth) { + this.wantClientAuth = wantAuth; + } + + /** + * Binds this socket to the previously specified URI. + * + * Overridden to allow for proper handling of needClientAuth. + * + * @throws IOException passed up from TcpTransportServer. + */ + @Override + public void bind() throws IOException { + super.bind(); + if (needClientAuth) { + ((SSLServerSocket)this.serverSocket).setNeedClientAuth(true); + } else if (wantClientAuth) { + ((SSLServerSocket)this.serverSocket).setWantClientAuth(true); + } + } + + /** + * Used to create Transports for this server. + * + * Overridden to allow the use of SslTransports (instead of TcpTransports). + * + * @param socket The incoming socket that will be wrapped into the new Transport. + * @param format The WireFormat being used. + * @return The newly return (SSL) Transport. + * @throws IOException + */ + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { + return new SslTransport(format, (SSLSocket)socket, this.initBuffer); + } + + @Override + public boolean isSslServer() { + return true; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java new file mode 100644 index 0000000..5731b85 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportFactory.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.auto; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import javax.net.ServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.FactoryFinder; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.wireformat.WireFormatFactory; + +/** + * + * + */ +public class AutoTcpTransportFactory extends TcpTransportFactory implements BrokerServiceAware { + + protected BrokerService brokerService; + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); + //server.setWireFormatFactory(createWireFormatFactory(options)); + server.setWireFormatFactory(new OpenWireFormatFactory()); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + boolean allowLinkStealingSet = false; + private Set<String> enabledProtocols; + + @Override + protected AutoTcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + AutoTcpTransportServer server = new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { + + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) + throws IOException { + if (format.getClass().toString().contains("MQTT") && !allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + return super.createTransport(socket, format); + } + + }; + + return server; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java new file mode 100644 index 0000000..65a0fe5 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTcpTransportServer.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.auto; + +import java.io.IOException; +import java.io.InputStream; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.net.ServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.transport.protocol.AmqpProtocolVerifier; +import org.apache.activemq.broker.transport.protocol.MqttProtocolVerifier; +import org.apache.activemq.broker.transport.protocol.OpenWireProtocolVerifier; +import org.apache.activemq.broker.transport.protocol.ProtocolVerifier; +import org.apache.activemq.broker.transport.protocol.StompProtocolVerifier; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.util.FactoryFinder; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.wireformat.WireFormat; +import org.apache.activemq.wireformat.WireFormatFactory; +import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A TCP based implementation of {@link TransportServer} + */ +public class AutoTcpTransportServer extends TcpTransportServer { + + private static final Logger LOG = LoggerFactory.getLogger(AutoTcpTransportServer.class); + + protected Map<String, Map<String, Object>> wireFormatOptions; + protected Map<String, Object> autoTransportOptions; + protected Set<String> enabledProtocols; + protected final Map<String, ProtocolVerifier> protocolVerifiers = new ConcurrentHashMap<String, ProtocolVerifier>(); + + protected BrokerService brokerService; + + private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/"); + private final ConcurrentMap<String, TransportFactory> transportFactories = new ConcurrentHashMap<String, TransportFactory>(); + + private static final FactoryFinder WIREFORMAT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/wireformat/"); + + public WireFormatFactory findWireFormatFactory(String scheme, Map<String, Map<String, Object>> options) throws IOException { + WireFormatFactory wff = null; + try { + wff = (WireFormatFactory)WIREFORMAT_FACTORY_FINDER.newInstance(scheme); + if (options != null) { + IntrospectionSupport.setProperties(wff, options.get(AutoTransportUtils.ALL)); + IntrospectionSupport.setProperties(wff, options.get(scheme)); + } + if (wff instanceof OpenWireFormatFactory) { + protocolVerifiers.put(AutoTransportUtils.OPENWIRE, new OpenWireProtocolVerifier((OpenWireFormatFactory) wff)); + } + return wff; + } catch (Throwable e) { + throw IOExceptionSupport.create("Could not create wire format factory for: " + scheme + ", reason: " + e, e); + } + } + + public TransportFactory findTransportFactory(String scheme, Map<String, ?> options) throws IOException { + scheme = append(scheme, "nio"); + scheme = append(scheme, "ssl"); + + if (scheme.isEmpty()) { + scheme = "tcp"; + } + + TransportFactory tf = transportFactories.get(scheme); + if (tf == null) { + // Try to load if from a META-INF property. + try { + tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme); + if (options != null) + IntrospectionSupport.setProperties(tf, options); + transportFactories.put(scheme, tf); + } catch (Throwable e) { + throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e); + } + } + return tf; + } + + protected String append(String currentScheme, String scheme) { + if (this.getBindLocation().getScheme().contains(scheme)) { + if (!currentScheme.isEmpty()) { + currentScheme += "+"; + } + currentScheme += scheme; + } + return currentScheme; + } + + /** + * @param transportFactory + * @param location + * @param serverSocketFactory + * @throws IOException + * @throws URISyntaxException + */ + public AutoTcpTransportServer(TcpTransportFactory transportFactory, + URI location, ServerSocketFactory serverSocketFactory, BrokerService brokerService, + Set<String> enabledProtocols) + throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory); + service = Executors.newCachedThreadPool(); + this.brokerService = brokerService; + this.enabledProtocols = enabledProtocols; + initProtocolVerifiers(); + } + + @Override + public void setWireFormatFactory(WireFormatFactory factory) { + super.setWireFormatFactory(factory); + initOpenWireProtocolVerifier(); + } + + protected void initProtocolVerifiers() { + initOpenWireProtocolVerifier(); + + if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.AMQP)) { + protocolVerifiers.put(AutoTransportUtils.AMQP, new AmqpProtocolVerifier()); + } + if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.STOMP)) { + protocolVerifiers.put(AutoTransportUtils.STOMP, new StompProtocolVerifier()); + } + if (isAllProtocols()|| enabledProtocols.contains(AutoTransportUtils.MQTT)) { + protocolVerifiers.put(AutoTransportUtils.MQTT, new MqttProtocolVerifier()); + } + } + + protected void initOpenWireProtocolVerifier() { + if (isAllProtocols() || enabledProtocols.contains(AutoTransportUtils.OPENWIRE)) { + OpenWireProtocolVerifier owpv; + if (wireFormatFactory instanceof OpenWireFormatFactory) { + owpv = new OpenWireProtocolVerifier((OpenWireFormatFactory) wireFormatFactory); + } else { + owpv = new OpenWireProtocolVerifier(new OpenWireFormatFactory()); + } + protocolVerifiers.put(AutoTransportUtils.OPENWIRE, owpv); + } + } + + protected boolean isAllProtocols() { + return enabledProtocols == null || enabledProtocols.isEmpty(); + } + + + protected final ExecutorService service; + + + /** + * This holds the initial buffer that has been read to detect the protocol. + */ + public InitBuffer initBuffer; + + @Override + protected void handleSocket(final Socket socket) { + final AutoTcpTransportServer server = this; + + //This needs to be done in a new thread because + //the socket might be waiting on the client to send bytes + //doHandleSocket can't complete until the protocol can be detected + service.submit(new Runnable() { + @Override + public void run() { + server.doHandleSocket(socket); + } + }); + } + + @Override + protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { + InputStream is = socket.getInputStream(); + + //We need to peak at the first 8 bytes of the buffer to detect the protocol + Buffer magic = new Buffer(8); + magic.readFrom(is); + + ProtocolInfo protocolInfo = detectProtocol(magic.getData()); + + initBuffer = new InitBuffer(8, ByteBuffer.allocate(8)); + initBuffer.buffer.put(magic.getData()); + + if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { + ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); + } + + WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); + Transport transport = createTransport(socket, format, protocolInfo.detectedTransportFactory); + + return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); + } + + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { + return new TcpTransport(format, socket, this.initBuffer); + } + + /** + * @param socket + * @param format + * @param detectedTransportFactory + * @return + */ + protected TcpTransport createTransport(Socket socket, WireFormat format, + TcpTransportFactory detectedTransportFactory) throws IOException { + return createTransport(socket, format); + } + + public void setWireFormatOptions(Map<String, Map<String, Object>> wireFormatOptions) { + this.wireFormatOptions = wireFormatOptions; + } + + public void setEnabledProtocols(Set<String> enabledProtocols) { + this.enabledProtocols = enabledProtocols; + } + + public void setAutoTransportOptions(Map<String, Object> autoTransportOptions) { + this.autoTransportOptions = autoTransportOptions; + if (autoTransportOptions.get("protocols") != null) + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoTransportOptions.get("protocols")); + } + + protected ProtocolInfo detectProtocol(byte[] buffer) throws IOException { + TcpTransportFactory detectedTransportFactory = transportFactory; + WireFormatFactory detectedWireFormatFactory = wireFormatFactory; + + boolean found = false; + for (String scheme : protocolVerifiers.keySet()) { + if (protocolVerifiers.get(scheme).isProtocol(buffer)) { + LOG.debug("Detected " + scheme); + detectedWireFormatFactory = findWireFormatFactory(scheme, wireFormatOptions); + + if (scheme.equals("default")) { + scheme = ""; + } + + detectedTransportFactory = (TcpTransportFactory) findTransportFactory(scheme, transportOptions); + found = true; + break; + } + } + + if (!found) { + throw new IllegalStateException("Could not detect wire format"); + } + + return new ProtocolInfo(detectedTransportFactory, detectedWireFormatFactory); + + } + + protected class ProtocolInfo { + public final TcpTransportFactory detectedTransportFactory; + public final WireFormatFactory detectedWireFormatFactory; + + public ProtocolInfo(TcpTransportFactory detectedTransportFactory, + WireFormatFactory detectedWireFormatFactory) { + super(); + this.detectedTransportFactory = detectedTransportFactory; + this.detectedWireFormatFactory = detectedWireFormatFactory; + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java new file mode 100644 index 0000000..453292d --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/AutoTransportUtils.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.auto; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.activemq.util.IntrospectionSupport; + +/** + * + * + */ +public class AutoTransportUtils { + + //wireformats + public static String ALL = "all"; + public static String OPENWIRE = "default"; + public static String STOMP = "stomp"; + public static String AMQP = "amqp"; + public static String MQTT = "mqtt"; + + //transports + public static String AUTO = "auto"; + + public static Map<String, Map<String, Object>> extractWireFormatOptions(Map<String, String> options ) { + Map<String, Map<String, Object>> wireFormatOptions = new HashMap<>(); + if (options != null) { + wireFormatOptions.put(OPENWIRE, IntrospectionSupport.extractProperties(options, "wireFormat.default.")); + wireFormatOptions.put(STOMP, IntrospectionSupport.extractProperties(options, "wireFormat.stomp.")); + wireFormatOptions.put(AMQP, IntrospectionSupport.extractProperties(options, "wireFormat.amqp.")); + wireFormatOptions.put(MQTT, IntrospectionSupport.extractProperties(options, "wireFormat.mqtt.")); + wireFormatOptions.put(ALL, IntrospectionSupport.extractProperties(options, "wireFormat.")); + } + return wireFormatOptions; + } + + public static Set<String> parseProtocols(String protocolString) { + Set<String> protocolSet = new HashSet<>();; + if (protocolString != null && !protocolString.isEmpty()) { + protocolSet.addAll(Arrays.asList(protocolString.split(","))); + } + return protocolSet; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java new file mode 100644 index 0000000..7ac3a97 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOSSLTransportServer.java @@ -0,0 +1,122 @@ +package org.apache.activemq.broker.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Set; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.nio.AutoInitNioSSLTransport; +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransportServer; +import org.apache.activemq.wireformat.WireFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AutoNIOSSLTransportServer extends AutoTcpTransportServer { + + private static final Logger LOG = LoggerFactory.getLogger(AutoNIOSSLTransportServer.class); + + private SSLContext context; + + public AutoNIOSSLTransportServer(SSLContext context, TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory, + BrokerService brokerService, Set<String> enabledProtocols) throws IOException, URISyntaxException { + super(transportFactory, location, serverSocketFactory, brokerService, enabledProtocols); + + this.context = context; + } + + private boolean needClientAuth; + private boolean wantClientAuth; + + protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, + InitBuffer initBuffer, ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { + NIOSSLTransport transport = new NIOSSLTransport(format, socket, engine, initBuffer, inputBuffer); + if (context != null) { + transport.setSslContext(context); + } + + transport.setNeedClientAuth(needClientAuth); + transport.setWantClientAuth(wantClientAuth); + + + return transport; + } + + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format) throws IOException { + throw new UnsupportedOperationException("method not supported"); + } + + @Override + public boolean isSslServer() { + return true; + } + + public boolean isNeedClientAuth() { + return this.needClientAuth; + } + + public void setNeedClientAuth(boolean value) { + this.needClientAuth = value; + } + + public boolean isWantClientAuth() { + return this.wantClientAuth; + } + + public void setWantClientAuth(boolean value) { + this.wantClientAuth = value; + } + + + @Override + protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception { + + //The SSLEngine needs to be initialized and handshake done to get the first command and detect the format + AutoInitNioSSLTransport in = new AutoInitNioSSLTransport(wireFormatFactory.createWireFormat(), socket); + if (context != null) { + in.setSslContext(context); + } + in.start(); + SSLEngine engine = in.getSslSession(); + + //Wait for handshake to finish initializing + byte[] read = null; + do { + in.serviceRead(); + } while((read = in.read) == null); + + in.stop(); + + initBuffer = new InitBuffer(in.readSize, ByteBuffer.allocate(read.length)); + initBuffer.buffer.put(read); + + ProtocolInfo protocolInfo = detectProtocol(read); + + if (protocolInfo.detectedTransportFactory instanceof BrokerServiceAware) { + ((BrokerServiceAware) protocolInfo.detectedTransportFactory).setBrokerService(brokerService); + } + + WireFormat format = protocolInfo.detectedWireFormatFactory.createWireFormat(); + Transport transport = createTransport(socket, format, engine, initBuffer, in.getInputBuffer(), protocolInfo.detectedTransportFactory); + + return new TransportInfo(format, transport, protocolInfo.detectedTransportFactory); + } + + +} + + http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java new file mode 100644 index 0000000..e1b6e71 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNIOTransport.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; + +import javax.net.SocketFactory; + +import org.apache.activemq.transport.nio.NIOTransport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * + * + */ +public class AutoNIOTransport extends NIOTransport { + + public AutoNIOTransport(WireFormat format, Socket socket, + InitBuffer initBuffer) throws IOException { + super(format, socket, initBuffer); + } + + public AutoNIOTransport(WireFormat wireFormat, Socket socket) + throws IOException { + super(wireFormat, socket); + } + + public AutoNIOTransport(WireFormat wireFormat, SocketFactory socketFactory, + URI remoteLocation, URI localLocation) throws UnknownHostException, + IOException { + super(wireFormat, socketFactory, remoteLocation, localLocation); + } + + + boolean doneInitBuffer = false; + + /** + * Read from the initial buffer if it is set + */ + @Override + protected int readFromBuffer() throws IOException { + int readSize = 0; + if (!doneInitBuffer) { + if (initBuffer == null) { + throw new IOException("Null initBuffer"); + } + if (nextFrameSize == -1) { + readSize = 4; + this.initBuffer.buffer.flip(); + for (int i = 0; i < 4; i++) { + currentBuffer.put(initBuffer.buffer.get()); + } + } else { + for (int i = 0; i < 4; i++) { + currentBuffer.put(initBuffer.buffer.get()); + } + readSize = 4; + doneInitBuffer = true; + } + + } else { + readSize += channel.read(currentBuffer); + } + return readSize; + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java new file mode 100644 index 0000000..8142a2a --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioSslTransportFactory.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.net.ServerSocketFactory; +import javax.net.ssl.SSLEngine; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer; +import org.apache.activemq.broker.transport.auto.AutoTransportUtils; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.nio.NIOSSLTransport; +import org.apache.activemq.transport.nio.NIOSSLTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * + * + */ +public class AutoNioSslTransportFactory extends NIOSSLTransportFactory implements BrokerServiceAware { + protected BrokerService brokerService; + + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + @Override + protected AutoNIOSSLTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new AutoNIOSSLTransportServer(context, this, location, serverSocketFactory, brokerService, enabledProtocols) { + + @Override + protected Transport createTransport(Socket socket, WireFormat format, SSLEngine engine, InitBuffer initBuffer, + ByteBuffer inputBuffer, TcpTransportFactory detectedFactory) throws IOException { + NIOSSLTransport nioSslTransport = (NIOSSLTransport) detectedFactory.createTransport( + format, socket, engine, initBuffer, inputBuffer); + + if (format.getClass().toString().contains("MQTT")) { + if (!allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + } + + if (context != null) { + nioSslTransport.setSslContext(context); + } + + nioSslTransport.setNeedClientAuth(isNeedClientAuth()); + nioSslTransport.setWantClientAuth(isWantClientAuth()); + + return nioSslTransport; + } + + }; + + } + + boolean allowLinkStealingSet = false; + private Set<String> enabledProtocols; + + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + if (SslContext.getCurrentSslContext() != null) { + try { + context = SslContext.getCurrentSslContext().getSSLContext(); + } catch (Exception e) { + throw new IOException(e); + } + } + + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); + server.setWireFormatFactory(new OpenWireFormatFactory()); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setAutoTransportOptions(IntrospectionSupport.extractProperties(options, "auto.")); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java new file mode 100644 index 0000000..0922ae6 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/auto/nio/AutoNioTransportFactory.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.auto.nio; + +import java.io.IOException; +import java.net.Socket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import javax.net.ServerSocketFactory; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.BrokerServiceAware; +import org.apache.activemq.broker.transport.auto.AutoTcpTransportServer; +import org.apache.activemq.broker.transport.auto.AutoTransportUtils; +import org.apache.activemq.openwire.OpenWireFormatFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportServer; +import org.apache.activemq.transport.nio.NIOTransport; +import org.apache.activemq.transport.nio.NIOTransportFactory; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.apache.activemq.transport.tcp.TcpTransportFactory; +import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.URISupport; +import org.apache.activemq.wireformat.WireFormat; + +/** + * + * + */ +public class AutoNioTransportFactory extends NIOTransportFactory implements BrokerServiceAware { + protected BrokerService brokerService; + /* (non-Javadoc) + * @see org.apache.activemq.broker.BrokerServiceAware#setBrokerService(org.apache.activemq.broker.BrokerService) + */ + @Override + public void setBrokerService(BrokerService brokerService) { + this.brokerService = brokerService; + } + + @Override + protected AutoTcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { + return new AutoTcpTransportServer(this, location, serverSocketFactory, brokerService, enabledProtocols) { + @Override + protected TcpTransport createTransport(Socket socket, WireFormat format, TcpTransportFactory detectedTransportFactory) throws IOException { + TcpTransport nioTransport = null; + if (detectedTransportFactory.getClass().equals(NIOTransportFactory.class)) { + nioTransport = new AutoNIOTransport(format, socket,this.initBuffer); + } else { + nioTransport = detectedTransportFactory.createTransport( + format, socket, this.initBuffer); + } + + if (format.getClass().toString().contains("MQTT")) { + if (!allowLinkStealingSet) { + this.setAllowLinkStealing(true); + } + } + + return nioTransport; + } + }; + + } + + boolean allowLinkStealingSet = false; + private Set<String> enabledProtocols; + + @Override + public TransportServer doBind(final URI location) throws IOException { + try { + Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location)); + + Map<String, Object> autoProperties = IntrospectionSupport.extractProperties(options, "auto."); + this.enabledProtocols = AutoTransportUtils.parseProtocols((String) autoProperties.get("protocols")); + + ServerSocketFactory serverSocketFactory = createServerSocketFactory(); + AutoTcpTransportServer server = createTcpTransportServer(location, serverSocketFactory); + //server.setWireFormatFactory(createWireFormatFactory(options)); + server.setWireFormatFactory(new OpenWireFormatFactory()); + if (options.get("allowLinkStealing") != null){ + allowLinkStealingSet = true; + } + IntrospectionSupport.setProperties(server, options); + server.setTransportOption(IntrospectionSupport.extractProperties(options, "transport.")); + server.setWireFormatOptions(AutoTransportUtils.extractWireFormatOptions(options)); + server.bind(); + + return server; + } catch (URISyntaxException e) { + throw IOExceptionSupport.create(e); + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java new file mode 100644 index 0000000..11f0574 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/AmqpProtocolVerifier.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.protocol; + + +/** + * + * + */ +public class AmqpProtocolVerifier implements ProtocolVerifier { + + static final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' }; + + @Override + public boolean isProtocol(byte[] value) { + for (int i = 0; i < PREFIX.length; i++) { + if (value[i] != PREFIX[i]) + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java new file mode 100644 index 0000000..4b4edd2 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/MqttProtocolVerifier.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.transport.protocol; + +/** + * + * + */ +public class MqttProtocolVerifier implements ProtocolVerifier { + + /* (non-Javadoc) + * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[]) + */ + @Override + public boolean isProtocol(byte[] value) { + boolean mqtt311 = value[4] == 77 && // M + value[5] == 81 && // Q + value[6] == 84 && // T + value[7] == 84; // T + + boolean mqtt31 = value[4] == 77 && // M + value[5] == 81 && // Q + value[6] == 73 && // I + value[7] == 115; // s + + return mqtt311 || mqtt31; + } + + + +}