Murtadha Hubail has submitted this change and it was merged. Change subject: [ASTERIXDB-2490][NET] Support Encrypted Multiplexed Connections ......................................................................
[ASTERIXDB-2490][NET] Support Encrypted Multiplexed Connections - user model changes: no - storage format changes: no - interface changes: yes Details: - Use SocketChannelFactory in multiplex connections to support both unencrypted and encrypted sockets. - Adapt TCPEndpoint to socket channels that require handshake. - Adapt test cases to API changes. Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c Reviewed-on: https://asterix-gerrit.ics.uci.edu/3071 Sonar-Qube: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java M hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java M hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java M hyracks-fullstack/hyracks/hyracks-net/pom.xml M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java M hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java M hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java 18 files changed, 188 insertions(+), 59 deletions(-) Approvals: Anon. E. Moose #1000171: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved Murtadha Hubail: Looks good to me, but someone else must approve diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java index 42ec795..f69c102 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java @@ -19,10 +19,10 @@ package org.apache.asterix.messaging; import java.io.IOException; -import java.nio.channels.SocketChannel; import org.apache.hyracks.api.comm.IBufferAcceptor; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelReadInterface; public class MessagingChannelReadInterface extends AbstractChannelReadInterface { @@ -32,7 +32,7 @@ } @Override - public int read(SocketChannel sc, int size) throws IOException, NetException { + public int read(ISocketChannel sc, int size) throws IOException, NetException { while (true) { if (size <= 0) { return size; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java index 357d761..4429219 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java @@ -19,9 +19,9 @@ package org.apache.hyracks.api.comm; import java.io.IOException; -import java.nio.channels.SocketChannel; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannel; /** * Represents the read interface of a {@link IChannelControlBlock}. @@ -68,7 +68,7 @@ * @throws IOException * @throws NetException */ - public int read(SocketChannel sc, int size) throws IOException, NetException; + public int read(ISocketChannel sc, int size) throws IOException, NetException; /** * Sets the read credits of this {@link IChannelReadInterface} diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java index c238ae3..5fe6ecb 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java @@ -23,6 +23,7 @@ import java.net.SocketAddress; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.comm.channels.IChannelConnectionFactory; import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; @@ -35,11 +36,12 @@ private final MuxDemux md; - public ClientNetworkManager(int nThreads) throws IOException { + public ClientNetworkManager(int nThreads, ISocketChannelFactory socketChannelFactory) { /* This is a connect only socket and does not listen to any incoming connections, so pass null to * localAddress and listener. */ - md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE); + md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, FullFrameChannelInterfaceFactory.INSTANCE, + socketChannelFactory); } public void start() throws IOException { diff --git a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java index a72573c..4d8767f 100644 --- a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java +++ b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java @@ -44,7 +44,7 @@ NetworkAddress ddsAddress = hcc.getResultDirectoryAddress(); resultDirectory = new ResultDirectory(ddsAddress.getAddress(), ddsAddress.getPort(), socketChannelFactory); - netManager = new ClientNetworkManager(nReaders); + netManager = new ClientNetworkManager(nReaders, socketChannelFactory); netManager.start(); resultClientCtx = new ResultClientContext(frameSize); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index a92fcb6..317d59a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -273,12 +273,12 @@ resultNetworkManager = new ResultNetworkManager(ncConfig.getResultListenAddress(), ncConfig.getResultListenPort(), resultPartitionManager, ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(), - FullFrameChannelInterfaceFactory.INSTANCE); + FullFrameChannelInterfaceFactory.INSTANCE, networkSecurityManager.getSocketChannelFactory()); if (ncConfig.getMessagingListenAddress() != null && serviceCtx.getMessagingChannelInterfaceFactory() != null) { messagingNetManager = new MessagingNetworkManager(this, ncConfig.getMessagingListenAddress(), ncConfig.getMessagingListenPort(), ncConfig.getNetThreadCount(), ncConfig.getMessagingPublicAddress(), ncConfig.getMessagingPublicPort(), - serviceCtx.getMessagingChannelInterfaceFactory()); + serviceCtx.getMessagingChannelInterfaceFactory(), networkSecurityManager.getSocketChannelFactory()); } } @@ -292,7 +292,8 @@ partitionManager = new PartitionManager(this); netManager = new NetworkManager(ncConfig.getDataListenAddress(), ncConfig.getDataListenPort(), partitionManager, ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), ncConfig.getDataPublicAddress(), - ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE); + ncConfig.getDataPublicPort(), FullFrameChannelInterfaceFactory.INSTANCE, + networkSecurityManager.getSocketChannelFactory()); netManager.start(); startApplication(); init(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java index a37d131..4c7270c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java @@ -32,6 +32,7 @@ import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener; @@ -53,10 +54,11 @@ private final Map<IChannelControlBlock, ICloseableBufferAcceptor> channelFullBufferAcceptor = new HashMap<>(); public MessagingNetworkManager(NodeControllerService ncs, String inetAddress, int inetPort, int nThreads, - String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory) { + String publicInetAddress, int publicInetPort, IChannelInterfaceFactory channelInterfaceFactory, + ISocketChannelFactory socketChannelFactory) { this.ncs = ncs; md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads, - MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory); + MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory); publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java index 3298b78..6876618 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.dataflow.ConnectorDescriptorId; import org.apache.hyracks.api.exceptions.NetException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.api.partitions.PartitionId; import org.apache.hyracks.comm.channels.IChannelConnectionFactory; import org.apache.hyracks.comm.channels.NetworkOutputChannel; @@ -60,11 +61,11 @@ public NetworkManager(String inetAddress, int inetPort, PartitionManager partitionManager, int nThreads, int nBuffers, String publicInetAddress, int publicInetPort, - IChannelInterfaceFactory channelInterfaceFactory) { + IChannelInterfaceFactory channelInterfaceFactory, ISocketChannelFactory socketChannelFactory) { this.partitionManager = partitionManager; this.nBuffers = nBuffers; md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads, - MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory); + MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory); // Just save these values for the moment; may be reset in start() publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java index ee821d6..fe8f2af 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java @@ -29,6 +29,7 @@ import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.exceptions.NetException; import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.api.result.IResultPartitionManager; import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.comm.channels.IChannelConnectionFactory; @@ -73,11 +74,11 @@ */ public ResultNetworkManager(String inetAddress, int inetPort, IResultPartitionManager partitionManager, int nThreads, int nBuffers, String publicInetAddress, int publicInetPort, - IChannelInterfaceFactory channelInterfaceFactory) { + IChannelInterfaceFactory channelInterfaceFactory, ISocketChannelFactory socketChannelFactory) { this.partitionManager = partitionManager; this.nBuffers = nBuffers; md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new ChannelOpenListener(), nThreads, - MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory); + MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, socketChannelFactory); // Just save these values for the moment; may be reset in start() publicNetworkAddress = new NetworkAddress(publicInetAddress, publicInetPort); } diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java index 205ecfe..764ec7d 100644 --- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java +++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java @@ -215,10 +215,10 @@ SelectionKey key = i.next(); i.remove(); final SelectableChannel sc = key.channel(); - // do not attempt to read until handle is set (e.g. after handshake is completed) + // do not attempt to read/write until handle is set (e.g. after handshake is completed) if (key.isReadable() && key.attachment() != null) { read(key); - } else if (key.isWritable()) { + } else if (key.isWritable() && key.attachment() != null) { write(key); } else if (key.isAcceptable()) { assert sc == serverSocketChannel; diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml b/hyracks-fullstack/hyracks/hyracks-net/pom.xml index 2cddf45..2dca39b 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml +++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml @@ -47,6 +47,11 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-ipc</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java index 3a35212..0a26097 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hyracks.api.comm.IChannelControlBlock; @@ -28,6 +27,7 @@ import org.apache.hyracks.api.comm.IChannelReadInterface; import org.apache.hyracks.api.comm.IChannelWriteInterface; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState; import org.apache.hyracks.util.JSONUtil; @@ -91,7 +91,7 @@ wi.writeComplete(); } - synchronized int read(SocketChannel sc, int size) throws IOException, NetException { + synchronized int read(ISocketChannel sc, int size) throws IOException, NetException { return ri.read(sc, size); } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java index 3ba8627..ab6dbf1 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java @@ -20,13 +20,13 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; import java.util.concurrent.BlockingDeque; import java.util.concurrent.LinkedBlockingDeque; import org.apache.hyracks.api.comm.IBufferFactory; import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,7 +52,7 @@ } @Override - public int read(SocketChannel sc, int size) throws IOException, NetException { + public int read(ISocketChannel sc, int size) throws IOException, NetException { while (true) { if (size <= 0) { return size; diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java index 96ccafb..f7c3826 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java @@ -22,7 +22,6 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; import java.util.BitSet; import java.util.Optional; @@ -31,6 +30,7 @@ import org.apache.hyracks.api.comm.IConnectionWriterState; import org.apache.hyracks.api.comm.MuxDemuxCommand; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener; import org.apache.hyracks.net.protocols.tcp.TCPConnection; import org.apache.hyracks.util.JSONUtil; @@ -160,6 +160,8 @@ private IChannelControlBlock ccb; + private boolean pendingWriteCompletion = false; + public WriterState() { cmdWriteBuffer = ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE); cmdWriteBuffer.flip(); @@ -168,7 +170,8 @@ } boolean writePending() { - return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0); + return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && pendingWriteSize > 0) + || pendingWriteCompletion; } @Override @@ -181,7 +184,10 @@ this.ccb = ccb; } - boolean performPendingWrite(SocketChannel sc) throws IOException { + boolean performPendingWrite(ISocketChannel sc) throws IOException { + if (pendingWriteCompletion && !sc.completeWrite()) { + return false; + } int len = cmdWriteBuffer.remaining(); if (len > 0) { int written = sc.write(cmdWriteBuffer); @@ -209,10 +215,16 @@ pendingBuffer = null; pendingWriteSize = 0; } + // must ensure all pending writes are performed before calling ccb.writeComplete() + if (sc.isPendingWrite()) { + pendingWriteCompletion = true; + return false; + } if (ccb != null) { ccb.writeComplete(); ccb = null; } + pendingWriteCompletion = false; return true; } @@ -223,7 +235,7 @@ } void driveWriterStateMachine() throws IOException, NetException { - SocketChannel sc = tcpConnection.getSocketChannel(); + ISocketChannel sc = tcpConnection.getSocketChannel(); if (writerState.writePending()) { if (!writerState.performPendingWrite(sc)) { return; @@ -339,9 +351,9 @@ } void driveReaderStateMachine() throws IOException, NetException { - SocketChannel sc = tcpConnection.getSocketChannel(); + ISocketChannel sc = tcpConnection.getSocketChannel(); int chunksRead = 0; - while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE) { + while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE || sc.isPendingRead()) { if (readerState.readBuffer.remaining() > 0) { int read = sc.read(readerState.readBuffer); if (read < 0) { diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java index 4ee7e83..39ce408 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.comm.IChannelInterfaceFactory; import org.apache.hyracks.api.exceptions.NetException; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener; import org.apache.hyracks.net.protocols.tcp.TCPConnection; import org.apache.hyracks.net.protocols.tcp.TCPEndpoint; @@ -68,13 +69,18 @@ * - Number of threads to use for data transfer * @param maxConnectionAttempts * - Maximum number of connection attempts + * @param channelInterfaceFactory + * - The channel interface factory + * @param socketChannelFactory + * - The socket channel factory */ public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener listener, int nThreads, - int maxConnectionAttempts, IChannelInterfaceFactory channelInterfaceFatory) { + int maxConnectionAttempts, IChannelInterfaceFactory channelInterfaceFactory, + ISocketChannelFactory socketChannelFactory) { this.localAddress = localAddress; this.channelOpenListener = listener; this.maxConnectionAttempts = maxConnectionAttempts; - this.channelInterfaceFatory = channelInterfaceFatory; + this.channelInterfaceFatory = channelInterfaceFactory; outgoingConnectionMap = new HashMap<>(); incomingConnectionMap = new HashMap<>(); this.tcpEndpoint = new TCPEndpoint(new ITCPConnectionListener() { @@ -126,7 +132,7 @@ } } } - }, nThreads); + }, nThreads, socketChannelFactory); perfCounters = new MuxDemuxPerformanceCounters(); } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java index ff4627a..1814edb 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java @@ -22,8 +22,8 @@ import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +38,7 @@ private final TCPEndpoint endpoint; - private final SocketChannel channel; + private final ISocketChannel channel; private final InetSocketAddress remoteAddress; private final SelectionKey key; @@ -50,26 +50,26 @@ private ConnectionType type; - public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, SelectionKey key, Selector selector, + public TCPConnection(TCPEndpoint endpoint, ISocketChannel channel, SelectionKey key, Selector selector, ConnectionType type) { this.endpoint = endpoint; this.channel = channel; this.key = key; this.selector = selector; this.type = type; - remoteAddress = (InetSocketAddress) channel.socket().getRemoteSocketAddress(); + remoteAddress = (InetSocketAddress) channel.getSocketChannel().socket().getRemoteSocketAddress(); } public TCPEndpoint getEndpoint() { return endpoint; } - public SocketChannel getSocketChannel() { + public ISocketChannel getSocketChannel() { return channel; } public InetSocketAddress getLocalAddress() { - return (InetSocketAddress) channel.socket().getLocalSocketAddress(); + return (InetSocketAddress) channel.getSocketChannel().socket().getLocalSocketAddress(); } public InetSocketAddress getRemoteAddress() { diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java index 05e2175..faec871 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java @@ -19,6 +19,8 @@ package org.apache.hyracks.net.protocols.tcp; import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType; +import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.INCOMING; +import static org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.OUTGOING; import java.io.IOException; import java.net.InetSocketAddress; @@ -31,7 +33,10 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; +import org.apache.hyracks.api.network.ISocketChannel; +import org.apache.hyracks.api.network.ISocketChannelFactory; import org.apache.hyracks.util.NetworkUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,9 +57,13 @@ private int nextThread; - public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads) { + private final ISocketChannelFactory socketChannelFactory; + + public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads, + ISocketChannelFactory socketChannelFactory) { this.connectionListener = connectionListener; this.nThreads = nThreads; + this.socketChannelFactory = socketChannelFactory; } public void start(InetSocketAddress localAddress) throws IOException { @@ -113,6 +122,8 @@ private final List<SocketChannel> workingIncomingConnections; + private final List<PendingHandshakeConnection> handshakeCompletedConnections; + private final Selector selector; public IOThread() throws IOException { @@ -123,6 +134,7 @@ this.workingPendingConnections = new ArrayList<>(); this.incomingConnections = new ArrayList<>(); this.workingIncomingConnections = new ArrayList<>(); + handshakeCompletedConnections = new ArrayList<>(); selector = Selector.open(); } @@ -151,8 +163,7 @@ SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT); key.attach(address); } else { - SelectionKey key = channel.register(selector, 0); - createConnection(key, channel); + socketConnected(address, channel); } } } @@ -161,15 +172,15 @@ if (!workingIncomingConnections.isEmpty()) { for (SocketChannel channel : workingIncomingConnections) { register(channel); - SelectionKey sKey = channel.register(selector, 0); - TCPConnection connection = new TCPConnection(TCPEndpoint.this, channel, sKey, selector, - ConnectionType.INCOMING); - sKey.attach(connection); - synchronized (connectionListener) { - connectionListener.acceptedConnection(connection); - } + connectionReceived(channel); } workingIncomingConnections.clear(); + } + if (!handshakeCompletedConnections.isEmpty()) { + for (final PendingHandshakeConnection conn : handshakeCompletedConnections) { + handshakeCompleted(conn); + } + handshakeCompletedConnections.clear(); } if (n > 0) { Iterator<SelectionKey> i = selector.selectedKeys().iterator(); @@ -211,7 +222,7 @@ } } if (finishConnect) { - createConnection(key, channel); + socketConnected((InetSocketAddress) key.attachment(), channel); } } } @@ -222,13 +233,29 @@ } } - private void createConnection(SelectionKey key, SocketChannel channel) { - TCPConnection connection = - new TCPConnection(TCPEndpoint.this, channel, key, selector, ConnectionType.OUTGOING); - key.attach(connection); - key.interestOps(0); - synchronized (connectionListener) { - connectionListener.connectionEstablished(connection); + private void handshakeCompleted(PendingHandshakeConnection conn) { + try { + if (conn.handshakeSuccess) { + final SelectionKey key = conn.socketChannel.getSocketChannel().register(selector, 0); + final TCPConnection tcpConn = + new TCPConnection(TCPEndpoint.this, conn.socketChannel, key, selector, conn.type); + key.attach(tcpConn); + switch (conn.type) { + case INCOMING: + connectionAccepted(tcpConn); + break; + case OUTGOING: + connectionEstablished(tcpConn); + break; + default: + throw new IllegalStateException("Unknown connection type: " + conn.type); + } + } else { + handleHandshakeFailure(conn); + } + } catch (Exception e) { + LOGGER.error("failed to establish connection after handshake", e); + handleHandshakeFailure(conn); } } @@ -262,5 +289,75 @@ NetworkUtil.configure(channel); channel.configureBlocking(false); } + + private void socketConnected(InetSocketAddress remoteAddress, SocketChannel channel) { + final ISocketChannel socketChannel = socketChannelFactory.createClientChannel(channel); + final PendingHandshakeConnection conn = + new PendingHandshakeConnection(socketChannel, remoteAddress, OUTGOING); + if (socketChannel.requiresHandshake()) { + asyncHandshake(conn); + } else { + conn.handshakeSuccess = true; + handshakeCompletedConnections.add(conn); + } + } + + private void connectionReceived(SocketChannel channel) { + final ISocketChannel socketChannel = socketChannelFactory.createServerChannel(channel); + final PendingHandshakeConnection conn = new PendingHandshakeConnection(socketChannel, null, INCOMING); + if (socketChannel.requiresHandshake()) { + asyncHandshake(conn); + } else { + conn.handshakeSuccess = true; + handshakeCompletedConnections.add(conn); + } + } + + private void asyncHandshake(PendingHandshakeConnection connection) { + CompletableFuture.supplyAsync(connection.socketChannel::handshake).exceptionally(ex -> false) + .thenAccept(handshakeSuccess -> handleHandshakeCompletion(handshakeSuccess, connection)); + } + + private void handleHandshakeCompletion(Boolean handshakeSuccess, PendingHandshakeConnection conn) { + conn.handshakeSuccess = handshakeSuccess; + handshakeCompletedConnections.add(conn); + selector.wakeup(); + } + + private void connectionEstablished(TCPConnection connection) { + synchronized (connectionListener) { + connectionListener.connectionEstablished(connection); + } + } + + private void connectionAccepted(TCPConnection connection) { + synchronized (connectionListener) { + connectionListener.acceptedConnection(connection); + } + } + + private void handleHandshakeFailure(PendingHandshakeConnection conn) { + NetworkUtil.closeQuietly(conn.socketChannel); + if (conn.type == OUTGOING) { + synchronized (connectionListener) { + connectionListener.connectionFailure(conn.address, new IOException("handshake failure")); + } + } + } + } + + private static class PendingHandshakeConnection { + + private final ISocketChannel socketChannel; + private final ConnectionType type; + private final InetSocketAddress address; + private boolean handshakeSuccess = false; + + PendingHandshakeConnection(ISocketChannel socketChannel, InetSocketAddress address, + ConnectionType connectionType) { + this.socketChannel = socketChannel; + this.type = connectionType; + this.address = address; + } } } diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java index f9a610c..6d9e7c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java @@ -28,6 +28,7 @@ import org.apache.hyracks.api.comm.IBufferFactory; import org.apache.hyracks.api.comm.IChannelControlBlock; import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; +import org.apache.hyracks.api.network.ISocketChannel; import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelReadInterface; import org.apache.hyracks.util.StorageUtil; @@ -61,7 +62,7 @@ readInterface.setFullBufferAcceptor(new ReadFullBufferAcceptor(fullBufferQ)); readInterface.setBufferFactory(bufferFactory, RECEIVER_BUFFER_COUNT, FRAME_SIZE); Assert.assertEquals(EXPECTED_CHANNEL_CREDIT, channelCredit.get()); - final SocketChannel socketChannel = mockSocketChannel(ccb); + final ISocketChannel socketChannel = mockSocketChannel(ccb); final Thread networkFrameReader = new Thread(() -> { try { int framesRead = FRAMES_TO_READ_COUNT; @@ -124,8 +125,8 @@ return ccb; } - private SocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException { - final SocketChannel sc = Mockito.mock(SocketChannel.class); + private ISocketChannel mockSocketChannel(IChannelControlBlock ccb) throws IOException { + final ISocketChannel sc = Mockito.mock(ISocketChannel.class); Mockito.when(sc.read(Mockito.any(ByteBuffer.class))).thenAnswer(invocation -> { ccb.addPendingCredits(-FRAME_SIZE); final ByteBuffer buffer = invocation.getArgumentAt(0, ByteBuffer.class); diff --git a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java index 8f582ba..ec1d21c 100644 --- a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java +++ b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java @@ -27,6 +27,7 @@ import org.apache.hyracks.api.comm.IBufferAcceptor; import org.apache.hyracks.api.comm.ICloseableBufferAcceptor; +import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory; import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock; import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory; import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener; @@ -162,7 +163,7 @@ } }; return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1, 5, - FullFrameChannelInterfaceFactory.INSTANCE); + FullFrameChannelInterfaceFactory.INSTANCE, PlainSocketChannelFactory.INSTANCE); } private class ChannelIO { -- To view, visit https://asterix-gerrit.ics.uci.edu/3071 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c Gerrit-PatchSet: 10 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]>
