ARTEMIS-1511 Refactor AMQP Transport for use with other test clients
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5211afdf Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5211afdf Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5211afdf Branch: refs/heads/master Commit: 5211afdf866fbcb5b538b2d5e2670dd5df385423 Parents: 63b156e Author: Martyn Taylor <mtay...@redhat.com> Authored: Fri Nov 10 12:31:29 2017 +0000 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Mon Nov 13 16:55:47 2017 -0500 ---------------------------------------------------------------------- .../transport/amqp/client/AmqpClient.java | 4 +- .../transport/amqp/client/AmqpConnection.java | 7 +- .../client/transport/NettyTcpTransport.java | 460 ------------------- .../amqp/client/transport/NettyTransport.java | 56 --- .../client/transport/NettyTransportFactory.java | 83 ---- .../transport/NettyTransportListener.java | 48 -- .../client/transport/NettyTransportOptions.java | 208 --------- .../transport/NettyTransportSslOptions.java | 302 ------------ .../client/transport/NettyTransportSupport.java | 304 ------------ .../amqp/client/transport/NettyWSTransport.java | 171 ------- .../client/transport/X509AliasKeyManager.java | 86 ---- .../transport/netty/NettyTcpTransport.java | 460 +++++++++++++++++++ .../transport/netty/NettyTransport.java | 57 +++ .../transport/netty/NettyTransportFactory.java | 82 ++++ .../transport/netty/NettyTransportListener.java | 48 ++ .../transport/netty/NettyTransportOptions.java | 219 +++++++++ .../netty/NettyTransportSslOptions.java | 302 ++++++++++++ .../transport/netty/NettyTransportSupport.java | 304 ++++++++++++ .../transport/netty/NettyWSTransport.java | 172 +++++++ .../transport/netty/X509AliasKeyManager.java | 86 ++++ .../impl/netty/NettyHandshakeTimeoutTest.java | 6 +- 21 files changed, 1739 insertions(+), 1726 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java index fddaf9d..d35d0ab 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java @@ -21,8 +21,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.activemq.transport.amqp.client.transport.NettyTransport; -import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; +import org.apache.activemq.transport.netty.NettyTransport; +import org.apache.activemq.transport.netty.NettyTransportFactory; import org.apache.qpid.proton.amqp.Symbol; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 2fc720a..01e2288 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -33,8 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.InactivityIOException; +import org.apache.activemq.transport.netty.NettyTransport; import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator; -import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; +import org.apache.activemq.transport.netty.NettyTransportListener; import org.apache.activemq.transport.amqp.client.util.AsyncResult; import org.apache.activemq.transport.amqp.client.util.ClientFuture; import org.apache.activemq.transport.amqp.client.util.IdGenerator; @@ -80,7 +81,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private final AtomicLong sessionIdGenerator = new AtomicLong(); private final AtomicLong txIdGenerator = new AtomicLong(); private final Collector protonCollector = new CollectorImpl(); - private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport; + private final NettyTransport transport; private final Transport protonTransport = Transport.Factory.create(); private final String username; @@ -109,7 +110,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private boolean trace; private boolean noContainerID = false; - public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) { + public AmqpConnection(NettyTransport transport, String username, String password) { setEndpoint(Connection.Factory.create()); getEndpoint().collect(protonCollector); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java deleted file mode 100644 index 7ce3bb9..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import java.io.IOException; -import java.net.URI; -import java.security.Principal; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.FixedRecvByteBufAllocator; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.logging.LoggingHandler; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; - -/** - * TCP based transport that uses Netty as the underlying IO layer. - */ -public class NettyTcpTransport implements NettyTransport { - - private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); - - private static final int SHUTDOWN_TIMEOUT = 100; - public static final int DEFAULT_MAX_FRAME_SIZE = 65535; - - protected Bootstrap bootstrap; - protected EventLoopGroup group; - protected Channel channel; - protected NettyTransportListener listener; - protected final NettyTransportOptions options; - protected final URI remote; - protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; - - private final AtomicBoolean connected = new AtomicBoolean(); - private final AtomicBoolean closed = new AtomicBoolean(); - private final CountDownLatch connectLatch = new CountDownLatch(1); - private volatile IOException failureCause; - - /** - * Create a new transport instance - * - * @param remoteLocation - * the URI that defines the remote resource to connect to. - * @param options - * the transport options used to configure the socket connection. - */ - public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) { - this(null, remoteLocation, options); - } - - /** - * Create a new transport instance - * - * @param listener - * the TransportListener that will receive events from this Transport. - * @param remoteLocation - * the URI that defines the remote resource to connect to. - * @param options - * the transport options used to configure the socket connection. - */ - public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { - if (options == null) { - throw new IllegalArgumentException("Transport Options cannot be null"); - } - - if (remoteLocation == null) { - throw new IllegalArgumentException("Transport remote location cannot be null"); - } - - this.options = options; - this.listener = listener; - this.remote = remoteLocation; - } - - @Override - public void connect() throws IOException { - - if (listener == null) { - throw new IllegalStateException("A transport listener must be set before connection attempts."); - } - - final SslHandler sslHandler; - if (isSSL()) { - try { - sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); - } catch (Exception ex) { - // TODO: can we stop it throwing Exception? - throw IOExceptionSupport.create(ex); - } - } else { - sslHandler = null; - } - - group = new NioEventLoopGroup(1); - - bootstrap = new Bootstrap(); - bootstrap.group(group); - bootstrap.channel(NioSocketChannel.class); - bootstrap.handler(new ChannelInitializer<Channel>() { - @Override - public void initChannel(Channel connectedChannel) throws Exception { - configureChannel(connectedChannel, sslHandler); - } - }); - - configureNetty(bootstrap, getTransportOptions()); - - ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort()); - future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - handleException(future.channel(), IOExceptionSupport.create(future.cause())); - } - } - }); - - try { - connectLatch.await(); - } catch (InterruptedException ex) { - LOG.debug("Transport connection was interrupted."); - Thread.interrupted(); - failureCause = IOExceptionSupport.create(ex); - } - - if (failureCause != null) { - // Close out any Netty resources now as they are no longer needed. - if (channel != null) { - channel.close().syncUninterruptibly(); - channel = null; - } - if (group != null) { - Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { - LOG.trace("Channel group shutdown failed to complete in allotted time"); - } - group = null; - } - - throw failureCause; - } else { - // Connected, allow any held async error to fire now and close the transport. - channel.eventLoop().execute(new Runnable() { - - @Override - public void run() { - if (failureCause != null) { - channel.pipeline().fireExceptionCaught(failureCause); - } - } - }); - } - } - - @Override - public boolean isConnected() { - return connected.get(); - } - - @Override - public boolean isSSL() { - return options.isSSL(); - } - - @Override - public void close() throws IOException { - if (closed.compareAndSet(false, true)) { - connected.set(false); - try { - if (channel != null) { - channel.close().syncUninterruptibly(); - } - } finally { - if (group != null) { - Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); - if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { - LOG.trace("Channel group shutdown failed to complete in allotted time"); - } - } - } - } - } - - @Override - public ByteBuf allocateSendBuffer(int size) throws IOException { - checkConnected(); - return channel.alloc().ioBuffer(size, size); - } - - @Override - public void send(ByteBuf output) throws IOException { - checkConnected(); - int length = output.readableBytes(); - if (length == 0) { - return; - } - - LOG.trace("Attempted write of: {} bytes", length); - - channel.writeAndFlush(output); - } - - @Override - public NettyTransportListener getTransportListener() { - return listener; - } - - @Override - public void setTransportListener(NettyTransportListener listener) { - this.listener = listener; - } - - @Override - public NettyTransportOptions getTransportOptions() { - return options; - } - - @Override - public URI getRemoteLocation() { - return remote; - } - - @Override - public Principal getLocalPrincipal() { - Principal result = null; - - if (isSSL()) { - SslHandler sslHandler = channel.pipeline().get(SslHandler.class); - result = sslHandler.engine().getSession().getLocalPrincipal(); - } - - return result; - } - - @Override - public void setMaxFrameSize(int maxFrameSize) { - if (connected.get()) { - throw new IllegalStateException("Cannot change Max Frame Size while connected."); - } - - this.maxFrameSize = maxFrameSize; - } - - @Override - public int getMaxFrameSize() { - return maxFrameSize; - } - - // ----- Internal implementation details, can be overridden as needed -----// - - protected String getRemoteHost() { - return remote.getHost(); - } - - protected int getRemotePort() { - if (remote.getPort() != -1) { - return remote.getPort(); - } else { - return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort(); - } - } - - protected void addAdditionalHandlers(ChannelPipeline pipeline) { - - } - - protected ChannelInboundHandlerAdapter createChannelHandler() { - return new NettyTcpTransportHandler(); - } - - // ----- Event Handlers which can be overridden in subclasses -------------// - - protected void handleConnected(Channel channel) throws Exception { - LOG.trace("Channel has become active! Channel is {}", channel); - connectionEstablished(channel); - } - - protected void handleChannelInactive(Channel channel) throws Exception { - LOG.trace("Channel has gone inactive! Channel is {}", channel); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportClosed listener"); - listener.onTransportClosed(); - } - } - - protected void handleException(Channel channel, Throwable cause) throws Exception { - LOG.trace("Exception on channel! Channel is {}", channel); - if (connected.compareAndSet(true, false) && !closed.get()) { - LOG.trace("Firing onTransportError listener"); - if (failureCause != null) { - listener.onTransportError(failureCause); - } else { - listener.onTransportError(cause); - } - } else { - // Hold the first failure for later dispatch if connect succeeds. - // This will then trigger disconnect using the first error reported. - if (failureCause == null) { - LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); - failureCause = IOExceptionSupport.create(cause); - } - - connectionFailed(channel, failureCause); - } - } - - // ----- State change handlers and checks ---------------------------------// - - protected final void checkConnected() throws IOException { - if (!connected.get()) { - throw new IOException("Cannot send to a non-connected transport."); - } - } - - /* - * Called when the transport has successfully connected and is ready for use. - */ - private void connectionEstablished(Channel connectedChannel) { - channel = connectedChannel; - connected.set(true); - connectLatch.countDown(); - } - - /* - * Called when the transport connection failed and an error should be returned. - */ - private void connectionFailed(Channel failedChannel, IOException cause) { - failureCause = cause; - channel = failedChannel; - connected.set(false); - connectLatch.countDown(); - } - - private NettyTransportSslOptions getSslOptions() { - return (NettyTransportSslOptions) getTransportOptions(); - } - - private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { - bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); - bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); - bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); - - if (options.getSendBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); - } - - if (options.getReceiveBufferSize() != -1) { - bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); - bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); - } - - if (options.getTrafficClass() != -1) { - bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); - } - } - - private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception { - if (isSSL()) { - channel.pipeline().addLast(sslHandler); - } - - if (getTransportOptions().isTraceBytes()) { - channel.pipeline().addLast("logger", new LoggingHandler(getClass())); - } - - addAdditionalHandlers(channel.pipeline()); - - channel.pipeline().addLast(createChannelHandler()); - } - - // ----- Handle connection events -----------------------------------------// - - protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> { - - @Override - public void channelRegistered(ChannelHandlerContext context) throws Exception { - channel = context.channel(); - } - - @Override - public void channelActive(ChannelHandlerContext context) throws Exception { - // In the Secure case we need to let the handshake complete before we - // trigger the connected event. - if (!isSSL()) { - handleConnected(context.channel()); - } else { - SslHandler sslHandler = context.pipeline().get(SslHandler.class); - sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { - @Override - public void operationComplete(Future<Channel> future) throws Exception { - if (future.isSuccess()) { - LOG.trace("SSL Handshake has completed: {}", channel); - handleConnected(channel); - } else { - LOG.trace("SSL Handshake has failed: {}", channel); - handleException(channel, future.cause()); - } - } - }); - } - } - - @Override - public void channelInactive(ChannelHandlerContext context) throws Exception { - handleChannelInactive(context.channel()); - } - - @Override - public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { - handleException(context.channel(), cause); - } - } - - // ----- Handle Binary data from connection -------------------------------// - - protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> { - - @Override - protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { - LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer); - listener.onData(buffer); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java deleted file mode 100644 index 4d5a389..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import java.io.IOException; -import java.net.URI; -import java.security.Principal; - -import io.netty.buffer.ByteBuf; - -/** - * Base for all Netty based Transports in this client. - */ -public interface NettyTransport { - - void connect() throws IOException; - - boolean isConnected(); - - boolean isSSL(); - - void close() throws IOException; - - ByteBuf allocateSendBuffer(int size) throws IOException; - - void send(ByteBuf output) throws IOException; - - NettyTransportListener getTransportListener(); - - void setTransportListener(NettyTransportListener listener); - - NettyTransportOptions getTransportOptions(); - - URI getRemoteLocation(); - - Principal getLocalPrincipal(); - - void setMaxFrameSize(int maxFrameSize); - - int getMaxFrameSize(); - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java deleted file mode 100644 index 30b2e21..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import java.net.URI; -import java.util.Map; - -import org.apache.activemq.transport.amqp.client.util.PropertyUtil; - -/** - * Factory for creating the Netty based TCP Transport. - */ -public final class NettyTransportFactory { - - private NettyTransportFactory() { - } - - /** - * Creates an instance of the given Transport and configures it using the properties set on - * the given remote broker URI. - * - * @param remoteURI - * The URI used to connect to a remote Peer. - * - * @return a new Transport instance. - * - * @throws Exception - * if an error occurs while creating the Transport instance. - */ - public static NettyTransport createTransport(URI remoteURI) throws Exception { - Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery()); - Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport."); - NettyTransportOptions transportOptions = null; - - remoteURI = PropertyUtil.replaceQuery(remoteURI, map); - - if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) { - transportOptions = NettyTransportOptions.INSTANCE.clone(); - } else { - transportOptions = NettyTransportSslOptions.INSTANCE.clone(); - } - - Map<String, String> unused = PropertyUtil.setProperties(transportOptions, transportURIOptions); - if (!unused.isEmpty()) { - String msg = " Not all transport options could be set on the TCP based" + - " Transport. Check the options are spelled correctly." + - " Unused parameters=[" + unused + "]." + - " This provider instance cannot be started."; - throw new IllegalArgumentException(msg); - } - - NettyTransport result = null; - - switch (remoteURI.getScheme().toLowerCase()) { - case "tcp": - case "ssl": - result = new NettyTcpTransport(remoteURI, transportOptions); - break; - case "ws": - case "wss": - result = new NettyWSTransport(remoteURI, transportOptions); - break; - default: - throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme()); - } - - return result; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java deleted file mode 100644 index 0163517..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import io.netty.buffer.ByteBuf; - -/** - * Listener interface that should be implemented by users of the various QpidJMS Transport - * classes. - */ -public interface NettyTransportListener { - - /** - * Called when new incoming data has become available. - * - * @param incoming - * the next incoming packet of data. - */ - void onData(ByteBuf incoming); - - /** - * Called if the connection state becomes closed. - */ - void onTransportClosed(); - - /** - * Called when an error occurs during normal Transport operations. - * - * @param cause - * the error that triggered this event. - */ - void onTransportError(Throwable cause); - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java deleted file mode 100644 index c5022c1..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -/** - * Encapsulates all the TCP Transport options in one configuration object. - */ -public class NettyTransportOptions implements Cloneable { - - public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024; - public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE; - public static final int DEFAULT_TRAFFIC_CLASS = 0; - public static final boolean DEFAULT_TCP_NO_DELAY = true; - public static final boolean DEFAULT_TCP_KEEP_ALIVE = false; - public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE; - public static final int DEFAULT_SO_TIMEOUT = -1; - public static final int DEFAULT_CONNECT_TIMEOUT = 60000; - public static final int DEFAULT_TCP_PORT = 5672; - public static final boolean DEFAULT_TRACE_BYTES = false; - - public static final NettyTransportOptions INSTANCE = new NettyTransportOptions(); - - private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE; - private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; - private int trafficClass = DEFAULT_TRAFFIC_CLASS; - private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; - private int soTimeout = DEFAULT_SO_TIMEOUT; - private int soLinger = DEFAULT_SO_LINGER; - private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE; - private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY; - private int defaultTcpPort = DEFAULT_TCP_PORT; - private boolean traceBytes = DEFAULT_TRACE_BYTES; - - /** - * @return the currently set send buffer size in bytes. - */ - public int getSendBufferSize() { - return sendBufferSize; - } - - /** - * Sets the send buffer size in bytes, the value must be greater than zero or an - * {@link IllegalArgumentException} will be thrown. - * - * @param sendBufferSize - * the new send buffer size for the TCP Transport. - * - * @throws IllegalArgumentException - * if the value given is not in the valid range. - */ - public void setSendBufferSize(int sendBufferSize) { - if (sendBufferSize <= 0) { - throw new IllegalArgumentException("The send buffer size must be > 0"); - } - - this.sendBufferSize = sendBufferSize; - } - - /** - * @return the currently configured receive buffer size in bytes. - */ - public int getReceiveBufferSize() { - return receiveBufferSize; - } - - /** - * Sets the receive buffer size in bytes, the value must be greater than zero or an - * {@link IllegalArgumentException} will be thrown. - * - * @param receiveBufferSize - * the new receive buffer size for the TCP Transport. - * - * @throws IllegalArgumentException - * if the value given is not in the valid range. - */ - public void setReceiveBufferSize(int receiveBufferSize) { - if (receiveBufferSize <= 0) { - throw new IllegalArgumentException("The send buffer size must be > 0"); - } - - this.receiveBufferSize = receiveBufferSize; - } - - /** - * @return the currently configured traffic class value. - */ - public int getTrafficClass() { - return trafficClass; - } - - /** - * Sets the traffic class value used by the TCP connection, valid range is between 0 and 255. - * - * @param trafficClass - * the new traffic class value. - * - * @throws IllegalArgumentException - * if the value given is not in the valid range. - */ - public void setTrafficClass(int trafficClass) { - if (trafficClass < 0 || trafficClass > 255) { - throw new IllegalArgumentException("Traffic class must be in the range [0..255]"); - } - - this.trafficClass = trafficClass; - } - - public int getSoTimeout() { - return soTimeout; - } - - public void setSoTimeout(int soTimeout) { - this.soTimeout = soTimeout; - } - - public boolean isTcpNoDelay() { - return tcpNoDelay; - } - - public void setTcpNoDelay(boolean tcpNoDelay) { - this.tcpNoDelay = tcpNoDelay; - } - - public int getSoLinger() { - return soLinger; - } - - public void setSoLinger(int soLinger) { - this.soLinger = soLinger; - } - - public boolean isTcpKeepAlive() { - return tcpKeepAlive; - } - - public void setTcpKeepAlive(boolean keepAlive) { - this.tcpKeepAlive = keepAlive; - } - - public int getConnectTimeout() { - return connectTimeout; - } - - public void setConnectTimeout(int connectTimeout) { - this.connectTimeout = connectTimeout; - } - - public int getDefaultTcpPort() { - return defaultTcpPort; - } - - public void setDefaultTcpPort(int defaultTcpPort) { - this.defaultTcpPort = defaultTcpPort; - } - - /** - * @return true if the transport should enable byte tracing - */ - public boolean isTraceBytes() { - return traceBytes; - } - - /** - * Determines if the transport should add a logger for bytes in / out - * - * @param traceBytes - * should the transport log the bytes in and out. - */ - public void setTraceBytes(boolean traceBytes) { - this.traceBytes = traceBytes; - } - - public boolean isSSL() { - return false; - } - - @Override - public NettyTransportOptions clone() { - return copyOptions(new NettyTransportOptions()); - } - - protected NettyTransportOptions copyOptions(NettyTransportOptions copy) { - copy.setConnectTimeout(getConnectTimeout()); - copy.setReceiveBufferSize(getReceiveBufferSize()); - copy.setSendBufferSize(getSendBufferSize()); - copy.setSoLinger(getSoLinger()); - copy.setSoTimeout(getSoTimeout()); - copy.setTcpKeepAlive(isTcpKeepAlive()); - copy.setTcpNoDelay(isTcpNoDelay()); - copy.setTrafficClass(getTrafficClass()); - - return copy; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java deleted file mode 100644 index 3289fce..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Holds the defined SSL options for connections that operate over a secure transport. Options - * are read from the environment and can be overridden by specifying them on the connection URI. - */ -public class NettyTransportSslOptions extends NettyTransportOptions { - - public static final String DEFAULT_STORE_TYPE = "jks"; - public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS"; - public static final boolean DEFAULT_TRUST_ALL = false; - public static final boolean DEFAULT_VERIFY_HOST = false; - public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"})); - public static final int DEFAULT_SSL_PORT = 5671; - - public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions(); - - private String keyStoreLocation; - private String keyStorePassword; - private String trustStoreLocation; - private String trustStorePassword; - private String storeType = DEFAULT_STORE_TYPE; - private String[] enabledCipherSuites; - private String[] disabledCipherSuites; - private String[] enabledProtocols; - private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]); - private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL; - - private boolean trustAll = DEFAULT_TRUST_ALL; - private boolean verifyHost = DEFAULT_VERIFY_HOST; - private String keyAlias; - private int defaultSslPort = DEFAULT_SSL_PORT; - - static { - INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore")); - INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); - INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore")); - INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword")); - } - - /** - * @return the keyStoreLocation currently configured. - */ - public String getKeyStoreLocation() { - return keyStoreLocation; - } - - /** - * Sets the location on disk of the key store to use. - * - * @param keyStoreLocation - * the keyStoreLocation to use to create the key manager. - */ - public void setKeyStoreLocation(String keyStoreLocation) { - this.keyStoreLocation = keyStoreLocation; - } - - /** - * @return the keyStorePassword - */ - public String getKeyStorePassword() { - return keyStorePassword; - } - - /** - * @param keyStorePassword - * the keyStorePassword to set - */ - public void setKeyStorePassword(String keyStorePassword) { - this.keyStorePassword = keyStorePassword; - } - - /** - * @return the trustStoreLocation - */ - public String getTrustStoreLocation() { - return trustStoreLocation; - } - - /** - * @param trustStoreLocation - * the trustStoreLocation to set - */ - public void setTrustStoreLocation(String trustStoreLocation) { - this.trustStoreLocation = trustStoreLocation; - } - - /** - * @return the trustStorePassword - */ - public String getTrustStorePassword() { - return trustStorePassword; - } - - /** - * @param trustStorePassword - * the trustStorePassword to set - */ - public void setTrustStorePassword(String trustStorePassword) { - this.trustStorePassword = trustStorePassword; - } - - /** - * @return the storeType - */ - public String getStoreType() { - return storeType; - } - - /** - * @param storeType - * the format that the store files are encoded in. - */ - public void setStoreType(String storeType) { - this.storeType = storeType; - } - - /** - * @return the enabledCipherSuites - */ - public String[] getEnabledCipherSuites() { - return enabledCipherSuites; - } - - /** - * @param enabledCipherSuites - * the enabledCipherSuites to set - */ - public void setEnabledCipherSuites(String[] enabledCipherSuites) { - this.enabledCipherSuites = enabledCipherSuites; - } - - /** - * @return the disabledCipherSuites - */ - public String[] getDisabledCipherSuites() { - return disabledCipherSuites; - } - - /** - * @param disabledCipherSuites - * the disabledCipherSuites to set - */ - public void setDisabledCipherSuites(String[] disabledCipherSuites) { - this.disabledCipherSuites = disabledCipherSuites; - } - - /** - * @return the enabledProtocols or null if the defaults should be used - */ - public String[] getEnabledProtocols() { - return enabledProtocols; - } - - /** - * The protocols to be set as enabled. - * - * @param enabledProtocols - * the enabled protocols to set, or null if the defaults should be used. - */ - public void setEnabledProtocols(String[] enabledProtocols) { - this.enabledProtocols = enabledProtocols; - } - - /** - * - * @return the protocols to disable or null if none should be - */ - public String[] getDisabledProtocols() { - return disabledProtocols; - } - - /** - * The protocols to be disable. - * - * @param disabledProtocols - * the protocols to disable, or null if none should be. - */ - public void setDisabledProtocols(String[] disabledProtocols) { - this.disabledProtocols = disabledProtocols; - } - - /** - * @return the context protocol to use - */ - public String getContextProtocol() { - return contextProtocol; - } - - /** - * The protocol value to use when creating an SSLContext via - * SSLContext.getInstance(protocol). - * - * @param contextProtocol - * the context protocol to use. - */ - public void setContextProtocol(String contextProtocol) { - this.contextProtocol = contextProtocol; - } - - /** - * @return the trustAll - */ - public boolean isTrustAll() { - return trustAll; - } - - /** - * @param trustAll - * the trustAll to set - */ - public void setTrustAll(boolean trustAll) { - this.trustAll = trustAll; - } - - /** - * @return the verifyHost - */ - public boolean isVerifyHost() { - return verifyHost; - } - - /** - * @param verifyHost - * the verifyHost to set - */ - public void setVerifyHost(boolean verifyHost) { - this.verifyHost = verifyHost; - } - - /** - * @return the key alias - */ - public String getKeyAlias() { - return keyAlias; - } - - /** - * @param keyAlias - * the key alias to use - */ - public void setKeyAlias(String keyAlias) { - this.keyAlias = keyAlias; - } - - public int getDefaultSslPort() { - return defaultSslPort; - } - - public void setDefaultSslPort(int defaultSslPort) { - this.defaultSslPort = defaultSslPort; - } - - @Override - public boolean isSSL() { - return true; - } - - @Override - public NettyTransportSslOptions clone() { - return copyOptions(new NettyTransportSslOptions()); - } - - protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) { - super.copyOptions(copy); - - copy.setKeyStoreLocation(getKeyStoreLocation()); - copy.setKeyStorePassword(getKeyStorePassword()); - copy.setTrustStoreLocation(getTrustStoreLocation()); - copy.setTrustStorePassword(getTrustStorePassword()); - copy.setStoreType(getStoreType()); - copy.setEnabledCipherSuites(getEnabledCipherSuites()); - copy.setDisabledCipherSuites(getDisabledCipherSuites()); - copy.setEnabledProtocols(getEnabledProtocols()); - copy.setDisabledProtocols(getDisabledProtocols()); - copy.setTrustAll(isTrustAll()); - copy.setVerifyHost(isVerifyHost()); - copy.setKeyAlias(getKeyAlias()); - copy.setContextProtocol(getContextProtocol()); - return copy; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java deleted file mode 100644 index d41c669..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import java.io.File; -import java.io.FileInputStream; -import java.io.InputStream; -import java.net.URI; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.SecureRandom; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; -import javax.net.ssl.X509ExtendedKeyManager; -import javax.net.ssl.X509TrustManager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.handler.ssl.SslHandler; - -/** - * Static class that provides various utility methods used by Transport implementations. - */ -public class NettyTransportSupport { - - private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class); - - /** - * Creates a Netty SslHandler instance for use in Transports that require an SSL encoder / - * decoder. - * - * @param remote - * The URI of the remote peer that the SslHandler will be used against. - * @param options - * The SSL options object to build the SslHandler instance from. - * - * @return a new SslHandler that is configured from the given options. - * - * @throws Exception - * if an error occurs while creating the SslHandler instance. - */ - public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception { - return new SslHandler(createSslEngine(remote, createSslContext(options), options)); - } - - /** - * Create a new SSLContext using the options specific in the given TransportSslOptions - * instance. - * - * @param options - * the configured options used to create the SSLContext. - * - * @return a new SSLContext instance. - * - * @throws Exception - * if an error occurs while creating the context. - */ - public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception { - try { - String contextProtocol = options.getContextProtocol(); - LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol); - - SSLContext context = SSLContext.getInstance(contextProtocol); - KeyManager[] keyMgrs = loadKeyManagers(options); - TrustManager[] trustManagers = loadTrustManagers(options); - - context.init(keyMgrs, trustManagers, new SecureRandom()); - return context; - } catch (Exception e) { - LOG.error("Failed to create SSLContext: {}", e, e); - throw e; - } - } - - /** - * Create a new SSLEngine instance in client mode from the given SSLContext and - * TransportSslOptions instances. - * - * @param context - * the SSLContext to use when creating the engine. - * @param options - * the TransportSslOptions to use to configure the new SSLEngine. - * - * @return a new SSLEngine instance in client mode. - * - * @throws Exception - * if an error occurs while creating the new SSLEngine. - */ - public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception { - return createSslEngine(null, context, options); - } - - /** - * Create a new SSLEngine instance in client mode from the given SSLContext and - * TransportSslOptions instances. - * - * @param remote - * the URI of the remote peer that will be used to initialize the engine, may be null - * if none should. - * @param context - * the SSLContext to use when creating the engine. - * @param options - * the TransportSslOptions to use to configure the new SSLEngine. - * - * @return a new SSLEngine instance in client mode. - * - * @throws Exception - * if an error occurs while creating the new SSLEngine. - */ - public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception { - SSLEngine engine = null; - if (remote == null) { - engine = context.createSSLEngine(); - } else { - engine = context.createSSLEngine(remote.getHost(), remote.getPort()); - } - - engine.setEnabledProtocols(buildEnabledProtocols(engine, options)); - engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options)); - engine.setUseClientMode(true); - - if (options.isVerifyHost()) { - SSLParameters sslParameters = engine.getSSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - engine.setSSLParameters(sslParameters); - } - - return engine; - } - - private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) { - List<String> enabledProtocols = new ArrayList<>(); - - if (options.getEnabledProtocols() != null) { - List<String> configuredProtocols = Arrays.asList(options.getEnabledProtocols()); - LOG.trace("Configured protocols from transport options: {}", configuredProtocols); - enabledProtocols.addAll(configuredProtocols); - } else { - List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols()); - LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols); - enabledProtocols.addAll(engineProtocols); - } - - String[] disabledProtocols = options.getDisabledProtocols(); - if (disabledProtocols != null) { - List<String> disabled = Arrays.asList(disabledProtocols); - LOG.trace("Disabled protocols: {}", disabled); - enabledProtocols.removeAll(disabled); - } - - LOG.trace("Enabled protocols: {}", enabledProtocols); - - return enabledProtocols.toArray(new String[0]); - } - - private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) { - List<String> enabledCipherSuites = new ArrayList<>(); - - if (options.getEnabledCipherSuites() != null) { - List<String> configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites()); - LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites); - enabledCipherSuites.addAll(configuredCipherSuites); - } else { - List<String> engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites()); - LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites); - enabledCipherSuites.addAll(engineCipherSuites); - } - - String[] disabledCipherSuites = options.getDisabledCipherSuites(); - if (disabledCipherSuites != null) { - List<String> disabled = Arrays.asList(disabledCipherSuites); - LOG.trace("Disabled cipher suites: {}", disabled); - enabledCipherSuites.removeAll(disabled); - } - - LOG.trace("Enabled cipher suites: {}", enabledCipherSuites); - - return enabledCipherSuites.toArray(new String[0]); - } - - private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception { - if (options.isTrustAll()) { - return new TrustManager[] {createTrustAllTrustManager()}; - } - - if (options.getTrustStoreLocation() == null) { - return null; - } - - TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); - - String storeLocation = options.getTrustStoreLocation(); - String storePassword = options.getTrustStorePassword(); - String storeType = options.getStoreType(); - - LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType); - - KeyStore trustStore = loadStore(storeLocation, storePassword, storeType); - fact.init(trustStore); - - return fact.getTrustManagers(); - } - - private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception { - if (options.getKeyStoreLocation() == null) { - return null; - } - - KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); - - String storeLocation = options.getKeyStoreLocation(); - String storePassword = options.getKeyStorePassword(); - String storeType = options.getStoreType(); - String alias = options.getKeyAlias(); - - LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType); - - KeyStore keyStore = loadStore(storeLocation, storePassword, storeType); - fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null); - - if (alias == null) { - return fact.getKeyManagers(); - } else { - validateAlias(keyStore, alias); - return wrapKeyManagers(alias, fact.getKeyManagers()); - } - } - - private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) { - KeyManager[] keyManagers = new KeyManager[origKeyManagers.length]; - for (int i = 0; i < origKeyManagers.length; i++) { - KeyManager km = origKeyManagers[i]; - if (km instanceof X509ExtendedKeyManager) { - km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km); - } - - keyManagers[i] = km; - } - - return keyManagers; - } - - private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException { - if (!store.containsAlias(alias)) { - throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store"); - } - - if (!store.isKeyEntry(alias)) { - throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry"); - } - } - - private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception { - KeyStore store = KeyStore.getInstance(storeType); - try (InputStream in = new FileInputStream(new File(storePath));) { - store.load(in, password != null ? password.toCharArray() : null); - } - - return store; - } - - private static TrustManager createTrustAllTrustManager() { - return new X509TrustManager() { - @Override - public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - @Override - public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException { - } - - @Override - public X509Certificate[] getAcceptedIssuers() { - return new X509Certificate[0]; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java deleted file mode 100644 index 9b0e6e2..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import java.io.IOException; -import java.net.URI; -import java.nio.charset.StandardCharsets; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelPipeline; -import io.netty.handler.codec.http.DefaultHttpHeaders; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; -import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame; -import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; -import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; -import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; -import io.netty.handler.codec.http.websocketx.WebSocketFrame; -import io.netty.handler.codec.http.websocketx.WebSocketVersion; - -/** - * Transport for communicating over WebSockets - */ -public class NettyWSTransport extends NettyTcpTransport { - - private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class); - - private static final String AMQP_SUB_PROTOCOL = "amqp"; - - /** - * Create a new transport instance - * - * @param remoteLocation - * the URI that defines the remote resource to connect to. - * @param options - * the transport options used to configure the socket connection. - */ - public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) { - this(null, remoteLocation, options); - } - - /** - * Create a new transport instance - * - * @param listener - * the TransportListener that will receive events from this Transport. - * @param remoteLocation - * the URI that defines the remote resource to connect to. - * @param options - * the transport options used to configure the socket connection. - */ - public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { - super(listener, remoteLocation, options); - } - - @Override - public void send(ByteBuf output) throws IOException { - checkConnected(); - int length = output.readableBytes(); - if (length == 0) { - return; - } - - LOG.trace("Attempted write of: {} bytes", length); - - channel.writeAndFlush(new BinaryWebSocketFrame(output)); - } - - @Override - protected ChannelInboundHandlerAdapter createChannelHandler() { - return new NettyWebSocketTransportHandler(); - } - - @Override - protected void addAdditionalHandlers(ChannelPipeline pipeline) { - pipeline.addLast(new HttpClientCodec()); - pipeline.addLast(new HttpObjectAggregator(8192)); - } - - @Override - protected void handleConnected(Channel channel) throws Exception { - LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel); - } - - // ----- Handle connection events -----------------------------------------// - - private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> { - - private final WebSocketClientHandshaker handshaker; - - NettyWebSocketTransportHandler() { - handshaker = WebSocketClientHandshakerFactory.newHandshaker( - getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, - true, new DefaultHttpHeaders(), getMaxFrameSize()); - } - - @Override - public void channelActive(ChannelHandlerContext context) throws Exception { - handshaker.handshake(context.channel()); - - super.channelActive(context); - } - - @Override - protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception { - LOG.trace("New data read: incoming: {}", message); - - Channel ch = ctx.channel(); - if (!handshaker.isHandshakeComplete()) { - handshaker.finishHandshake(ch, (FullHttpResponse) message); - LOG.trace("WebSocket Client connected! {}", ctx.channel()); - // Now trigger super processing as we are really connected. - NettyWSTransport.super.handleConnected(ch); - return; - } - - // We shouldn't get this since we handle the handshake previously. - if (message instanceof FullHttpResponse) { - FullHttpResponse response = (FullHttpResponse) message; - throw new IllegalStateException( - "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')'); - } - - WebSocketFrame frame = (WebSocketFrame) message; - if (frame instanceof TextWebSocketFrame) { - TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; - LOG.warn("WebSocket Client received message: " + textFrame.text()); - ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket.")); - } else if (frame instanceof BinaryWebSocketFrame) { - BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame; - LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes()); - listener.onData(binaryFrame.content()); - } else if (frame instanceof ContinuationWebSocketFrame) { - ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame; - LOG.trace("WebSocket Client received data continuation: {} bytes", continuationFrame.content().readableBytes()); - listener.onData(continuationFrame.content()); - } else if (frame instanceof PingWebSocketFrame) { - LOG.trace("WebSocket Client received ping, response with pong"); - ch.write(new PongWebSocketFrame(frame.content())); - } else if (frame instanceof CloseWebSocketFrame) { - LOG.trace("WebSocket Client received closing"); - ch.close(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java deleted file mode 100644 index 42d6a0b..0000000 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.activemq.transport.amqp.client.transport; - -import javax.net.ssl.SSLEngine; -import javax.net.ssl.X509ExtendedKeyManager; -import java.net.Socket; -import java.security.Principal; -import java.security.PrivateKey; -import java.security.cert.X509Certificate; - -/** - * An X509ExtendedKeyManager wrapper which always chooses and only - * returns the given alias, and defers retrieval to the delegate - * key manager. - */ -public class X509AliasKeyManager extends X509ExtendedKeyManager { - - private X509ExtendedKeyManager delegate; - private String alias; - - public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException { - if (alias == null) { - throw new IllegalArgumentException("The given key alias must not be null."); - } - - this.alias = alias; - this.delegate = delegate; - } - - @Override - public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) { - return alias; - } - - @Override - public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) { - return alias; - } - - @Override - public X509Certificate[] getCertificateChain(String alias) { - return delegate.getCertificateChain(alias); - } - - @Override - public String[] getClientAliases(String keyType, Principal[] issuers) { - return new String[]{alias}; - } - - @Override - public PrivateKey getPrivateKey(String alias) { - return delegate.getPrivateKey(alias); - } - - @Override - public String[] getServerAliases(String keyType, Principal[] issuers) { - return new String[]{alias}; - } - - @Override - public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) { - return alias; - } - - @Override - public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) { - return alias; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java new file mode 100644 index 0000000..9eab670 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.netty; + +import java.io.IOException; +import java.net.URI; +import java.security.Principal; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.FixedRecvByteBufAllocator; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +/** + * TCP based transport that uses Netty as the underlying IO layer. + */ +public class NettyTcpTransport implements NettyTransport { + + private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class); + + private static final int SHUTDOWN_TIMEOUT = 100; + public static final int DEFAULT_MAX_FRAME_SIZE = 65535; + + protected Bootstrap bootstrap; + protected EventLoopGroup group; + protected Channel channel; + protected NettyTransportListener listener; + protected final NettyTransportOptions options; + protected final URI remote; + protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; + + private final AtomicBoolean connected = new AtomicBoolean(); + private final AtomicBoolean closed = new AtomicBoolean(); + private final CountDownLatch connectLatch = new CountDownLatch(1); + private volatile IOException failureCause; + + /** + * Create a new transport instance + * + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) { + this(null, remoteLocation, options); + } + + /** + * Create a new transport instance + * + * @param listener + * the TransportListener that will receive events from this Transport. + * @param remoteLocation + * the URI that defines the remote resource to connect to. + * @param options + * the transport options used to configure the socket connection. + */ + public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) { + if (options == null) { + throw new IllegalArgumentException("Transport Options cannot be null"); + } + + if (remoteLocation == null) { + throw new IllegalArgumentException("Transport remote location cannot be null"); + } + + this.options = options; + this.listener = listener; + this.remote = remoteLocation; + } + + @Override + public void connect() throws IOException { + + if (listener == null) { + throw new IllegalStateException("A transport listener must be set before connection attempts."); + } + + final SslHandler sslHandler; + if (isSSL()) { + try { + sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions()); + } catch (Exception ex) { + // TODO: can we stop it throwing Exception? + throw IOExceptionSupport.create(ex); + } + } else { + sslHandler = null; + } + + group = new NioEventLoopGroup(1); + + bootstrap = new Bootstrap(); + bootstrap.group(group); + bootstrap.channel(NioSocketChannel.class); + bootstrap.handler(new ChannelInitializer<Channel>() { + @Override + public void initChannel(Channel connectedChannel) throws Exception { + configureChannel(connectedChannel, sslHandler); + } + }); + + configureNetty(bootstrap, getTransportOptions()); + + ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort()); + future.addListener(new ChannelFutureListener() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + handleException(future.channel(), IOExceptionSupport.create(future.cause())); + } + } + }); + + try { + connectLatch.await(); + } catch (InterruptedException ex) { + LOG.debug("Transport connection was interrupted."); + Thread.interrupted(); + failureCause = IOExceptionSupport.create(ex); + } + + if (failureCause != null) { + // Close out any Netty resources now as they are no longer needed. + if (channel != null) { + channel.close().syncUninterruptibly(); + channel = null; + } + if (group != null) { + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + group = null; + } + + throw failureCause; + } else { + // Connected, allow any held async error to fire now and close the transport. + channel.eventLoop().execute(new Runnable() { + + @Override + public void run() { + if (failureCause != null) { + channel.pipeline().fireExceptionCaught(failureCause); + } + } + }); + } + } + + @Override + public boolean isConnected() { + return connected.get(); + } + + @Override + public boolean isSSL() { + return options.isSSL(); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + connected.set(false); + try { + if (channel != null) { + channel.close().syncUninterruptibly(); + } + } finally { + if (group != null) { + Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS); + if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) { + LOG.trace("Channel group shutdown failed to complete in allotted time"); + } + } + } + } + } + + @Override + public ByteBuf allocateSendBuffer(int size) throws IOException { + checkConnected(); + return channel.alloc().ioBuffer(size, size); + } + + @Override + public ChannelFuture send(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return null; + } + + LOG.trace("Attempted write of: {} bytes", length); + + return channel.writeAndFlush(output); + } + + @Override + public NettyTransportListener getTransportListener() { + return listener; + } + + @Override + public void setTransportListener(NettyTransportListener listener) { + this.listener = listener; + } + + @Override + public NettyTransportOptions getTransportOptions() { + return options; + } + + @Override + public URI getRemoteLocation() { + return remote; + } + + @Override + public Principal getLocalPrincipal() { + Principal result = null; + + if (isSSL()) { + SslHandler sslHandler = channel.pipeline().get(SslHandler.class); + result = sslHandler.engine().getSession().getLocalPrincipal(); + } + + return result; + } + + @Override + public void setMaxFrameSize(int maxFrameSize) { + if (connected.get()) { + throw new IllegalStateException("Cannot change Max Frame Size while connected."); + } + + this.maxFrameSize = maxFrameSize; + } + + @Override + public int getMaxFrameSize() { + return maxFrameSize; + } + + // ----- Internal implementation details, can be overridden as needed -----// + + protected String getRemoteHost() { + return remote.getHost(); + } + + protected int getRemotePort() { + if (remote.getPort() != -1) { + return remote.getPort(); + } else { + return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort(); + } + } + + protected void addAdditionalHandlers(ChannelPipeline pipeline) { + + } + + protected ChannelInboundHandlerAdapter createChannelHandler() { + return new NettyTcpTransportHandler(); + } + + // ----- Event Handlers which can be overridden in subclasses -------------// + + protected void handleConnected(Channel channel) throws Exception { + LOG.trace("Channel has become active! Channel is {}", channel); + connectionEstablished(channel); + } + + protected void handleChannelInactive(Channel channel) throws Exception { + LOG.trace("Channel has gone inactive! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportClosed listener"); + listener.onTransportClosed(); + } + } + + protected void handleException(Channel channel, Throwable cause) throws Exception { + LOG.trace("Exception on channel! Channel is {}", channel); + if (connected.compareAndSet(true, false) && !closed.get()) { + LOG.trace("Firing onTransportError listener"); + if (failureCause != null) { + listener.onTransportError(failureCause); + } else { + listener.onTransportError(cause); + } + } else { + // Hold the first failure for later dispatch if connect succeeds. + // This will then trigger disconnect using the first error reported. + if (failureCause == null) { + LOG.trace("Holding error until connect succeeds: {}", cause.getMessage()); + failureCause = IOExceptionSupport.create(cause); + } + + connectionFailed(channel, failureCause); + } + } + + // ----- State change handlers and checks ---------------------------------// + + protected final void checkConnected() throws IOException { + if (!connected.get()) { + throw new IOException("Cannot send to a non-connected transport."); + } + } + + /* + * Called when the transport has successfully connected and is ready for use. + */ + private void connectionEstablished(Channel connectedChannel) { + channel = connectedChannel; + connected.set(true); + connectLatch.countDown(); + } + + /* + * Called when the transport connection failed and an error should be returned. + */ + private void connectionFailed(Channel failedChannel, IOException cause) { + failureCause = cause; + channel = failedChannel; + connected.set(false); + connectLatch.countDown(); + } + + private NettyTransportSslOptions getSslOptions() { + return (NettyTransportSslOptions) getTransportOptions(); + } + + private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) { + bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay()); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout()); + bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive()); + bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger()); + + if (options.getSendBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize()); + } + + if (options.getReceiveBufferSize() != -1) { + bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize()); + bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize())); + } + + if (options.getTrafficClass() != -1) { + bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass()); + } + } + + private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception { + if (isSSL()) { + channel.pipeline().addLast(sslHandler); + } + + if (getTransportOptions().isTraceBytes()) { + channel.pipeline().addLast("logger", new LoggingHandler(getClass())); + } + + addAdditionalHandlers(channel.pipeline()); + + channel.pipeline().addLast(createChannelHandler()); + } + + // ----- Handle connection events -----------------------------------------// + + protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> { + + @Override + public void channelRegistered(ChannelHandlerContext context) throws Exception { + channel = context.channel(); + } + + @Override + public void channelActive(ChannelHandlerContext context) throws Exception { + // In the Secure case we need to let the handshake complete before we + // trigger the connected event. + if (!isSSL()) { + handleConnected(context.channel()); + } else { + SslHandler sslHandler = context.pipeline().get(SslHandler.class); + sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + if (future.isSuccess()) { + LOG.trace("SSL Handshake has completed: {}", channel); + handleConnected(channel); + } else { + LOG.trace("SSL Handshake has failed: {}", channel); + handleException(channel, future.cause()); + } + } + }); + } + } + + @Override + public void channelInactive(ChannelHandlerContext context) throws Exception { + handleChannelInactive(context.channel()); + } + + @Override + public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { + handleException(context.channel(), cause); + } + } + + // ----- Handle Binary data from connection -------------------------------// + + protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> { + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { + LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer); + listener.onData(buffer); + } + } +}