Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/3071

Change subject: [ASTERIXDB-2490][NET] Support Encrypted Multiplex Connections
......................................................................

[ASTERIXDB-2490][NET] Support Encrypted Multiplex Connections

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Use SocketChannelFactory in multiplex connections
  to support both unencrypted and encrypted sockets.
- Adapt TCPEndpoint to socket channels that require
  handshake.
- Adapt test cases to API changes.

Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
M 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
M 
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
M hyracks-fullstack/hyracks/hyracks-net/pom.xml
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
M 
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
17 files changed, 142 insertions(+), 53 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/71/3071/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
index 42ec795..f69c102 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
@@ -19,10 +19,10 @@
 package org.apache.asterix.messaging;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
 
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelReadInterface;
 
 public class MessagingChannelReadInterface extends 
AbstractChannelReadInterface {
@@ -32,7 +32,7 @@
     }
 
     @Override
-    public int read(SocketChannel sc, int size) throws IOException, 
NetException {
+    public int read(ISocketChannel sc, int size) throws IOException, 
NetException {
         while (true) {
             if (size <= 0) {
                 return size;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
index 357d761..4429219 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -19,9 +19,9 @@
 package org.apache.hyracks.api.comm;
 
 import java.io.IOException;
-import java.nio.channels.SocketChannel;
 
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 
 /**
  * Represents the read interface of a {@link IChannelControlBlock}.
@@ -68,7 +68,7 @@
      * @throws IOException
      * @throws NetException
      */
-    public int read(SocketChannel sc, int size) throws IOException, 
NetException;
+    public int read(ISocketChannel sc, int size) throws IOException, 
NetException;
 
     /**
      * Sets the read credits of this {@link IChannelReadInterface}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
index c238ae3..5fe6ecb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
@@ -23,6 +23,7 @@
 import java.net.SocketAddress;
 
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import 
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -35,11 +36,12 @@
 
     private final MuxDemux md;
 
-    public ClientNetworkManager(int nThreads) throws IOException {
+    public ClientNetworkManager(int nThreads, ISocketChannelFactory 
socketChannelFactory) {
         /* This is a connect only socket and does not listen to any incoming 
connections, so pass null to
          * localAddress and listener.
          */
-        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, 
FullFrameChannelInterfaceFactory.INSTANCE);
+        md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS, 
FullFrameChannelInterfaceFactory.INSTANCE,
+                socketChannelFactory);
     }
 
     public void start() throws IOException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
index a72573c..4d8767f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -44,7 +44,7 @@
         NetworkAddress ddsAddress = hcc.getResultDirectoryAddress();
         resultDirectory = new ResultDirectory(ddsAddress.getAddress(), 
ddsAddress.getPort(), socketChannelFactory);
 
-        netManager = new ClientNetworkManager(nReaders);
+        netManager = new ClientNetworkManager(nReaders, socketChannelFactory);
         netManager.start();
 
         resultClientCtx = new ResultClientContext(frameSize);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a92fcb6..317d59a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -273,12 +273,12 @@
         resultNetworkManager = new 
ResultNetworkManager(ncConfig.getResultListenAddress(),
                 ncConfig.getResultListenPort(), resultPartitionManager, 
ncConfig.getNetThreadCount(),
                 ncConfig.getNetBufferCount(), 
ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
-                FullFrameChannelInterfaceFactory.INSTANCE);
+                FullFrameChannelInterfaceFactory.INSTANCE, 
networkSecurityManager.getSocketChannelFactory());
         if (ncConfig.getMessagingListenAddress() != null && 
serviceCtx.getMessagingChannelInterfaceFactory() != null) {
             messagingNetManager = new MessagingNetworkManager(this, 
ncConfig.getMessagingListenAddress(),
                     ncConfig.getMessagingListenPort(), 
ncConfig.getNetThreadCount(),
                     ncConfig.getMessagingPublicAddress(), 
ncConfig.getMessagingPublicPort(),
-                    serviceCtx.getMessagingChannelInterfaceFactory());
+                    serviceCtx.getMessagingChannelInterfaceFactory(), 
networkSecurityManager.getSocketChannelFactory());
         }
     }
 
@@ -292,7 +292,8 @@
         partitionManager = new PartitionManager(this);
         netManager = new NetworkManager(ncConfig.getDataListenAddress(), 
ncConfig.getDataListenPort(), partitionManager,
                 ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(), 
ncConfig.getDataPublicAddress(),
-                ncConfig.getDataPublicPort(), 
FullFrameChannelInterfaceFactory.INSTANCE);
+                ncConfig.getDataPublicPort(), 
FullFrameChannelInterfaceFactory.INSTANCE,
+                networkSecurityManager.getSocketChannelFactory());
         netManager.start();
         startApplication();
         init();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
index a37d131..4c7270c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
 import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -53,10 +54,11 @@
     private final Map<IChannelControlBlock, ICloseableBufferAcceptor> 
channelFullBufferAcceptor = new HashMap<>();
 
     public MessagingNetworkManager(NodeControllerService ncs, String 
inetAddress, int inetPort, int nThreads,
-            String publicInetAddress, int publicInetPort, 
IChannelInterfaceFactory channelInterfaceFactory) {
+            String publicInetAddress, int publicInetPort, 
IChannelInterfaceFactory channelInterfaceFactory,
+            ISocketChannelFactory socketChannelFactory) {
         this.ncs = ncs;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new 
ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, 
socketChannelFactory);
         publicNetworkAddress = new NetworkAddress(publicInetAddress, 
publicInetPort);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 3298b78..6876618 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
 import org.apache.hyracks.comm.channels.NetworkOutputChannel;
@@ -60,11 +61,11 @@
 
     public NetworkManager(String inetAddress, int inetPort, PartitionManager 
partitionManager, int nThreads,
             int nBuffers, String publicInetAddress, int publicInetPort,
-            IChannelInterfaceFactory channelInterfaceFactory) {
+            IChannelInterfaceFactory channelInterfaceFactory, 
ISocketChannelFactory socketChannelFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new 
ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, 
socketChannelFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, 
publicInetPort);
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index ee821d6..fe8f2af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -29,6 +29,7 @@
 import org.apache.hyracks.api.exceptions.HyracksException;
 import org.apache.hyracks.api.exceptions.NetException;
 import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.api.result.IResultPartitionManager;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
@@ -73,11 +74,11 @@
      */
     public ResultNetworkManager(String inetAddress, int inetPort, 
IResultPartitionManager partitionManager,
             int nThreads, int nBuffers, String publicInetAddress, int 
publicInetPort,
-            IChannelInterfaceFactory channelInterfaceFactory) {
+            IChannelInterfaceFactory channelInterfaceFactory, 
ISocketChannelFactory socketChannelFactory) {
         this.partitionManager = partitionManager;
         this.nBuffers = nBuffers;
         md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new 
ChannelOpenListener(), nThreads,
-                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+                MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory, 
socketChannelFactory);
         // Just save these values for the moment; may be reset in start()
         publicNetworkAddress = new NetworkAddress(publicInetAddress, 
publicInetPort);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 2cddf45..2dca39b 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -47,6 +47,11 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-ipc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 3a35212..0a26097 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -28,6 +27,7 @@
 import org.apache.hyracks.api.comm.IChannelReadInterface;
 import org.apache.hyracks.api.comm.IChannelWriteInterface;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import 
org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
 import org.apache.hyracks.util.JSONUtil;
 
@@ -91,7 +91,7 @@
         wi.writeComplete();
     }
 
-    synchronized int read(SocketChannel sc, int size) throws IOException, 
NetException {
+    synchronized int read(ISocketChannel sc, int size) throws IOException, 
NetException {
         return ri.read(sc, size);
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 3ba8627..ab6dbf1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -20,13 +20,13 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,7 +52,7 @@
     }
 
     @Override
-    public int read(SocketChannel sc, int size) throws IOException, 
NetException {
+    public int read(ISocketChannel sc, int size) throws IOException, 
NetException {
         while (true) {
             if (size <= 0) {
                 return size;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 96ccafb..f7c3826 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -22,7 +22,6 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
 import java.util.BitSet;
 import java.util.Optional;
 
@@ -31,6 +30,7 @@
 import org.apache.hyracks.api.comm.IConnectionWriterState;
 import org.apache.hyracks.api.comm.MuxDemuxCommand;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.util.JSONUtil;
@@ -160,6 +160,8 @@
 
         private IChannelControlBlock ccb;
 
+        private boolean pendingWriteCompletion = false;
+
         public WriterState() {
             cmdWriteBuffer = 
ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
             cmdWriteBuffer.flip();
@@ -168,7 +170,8 @@
         }
 
         boolean writePending() {
-            return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && 
pendingWriteSize > 0);
+            return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null && 
pendingWriteSize > 0)
+                    || pendingWriteCompletion;
         }
 
         @Override
@@ -181,7 +184,10 @@
             this.ccb = ccb;
         }
 
-        boolean performPendingWrite(SocketChannel sc) throws IOException {
+        boolean performPendingWrite(ISocketChannel sc) throws IOException {
+            if (pendingWriteCompletion && !sc.completeWrite()) {
+                return false;
+            }
             int len = cmdWriteBuffer.remaining();
             if (len > 0) {
                 int written = sc.write(cmdWriteBuffer);
@@ -209,10 +215,16 @@
                 pendingBuffer = null;
                 pendingWriteSize = 0;
             }
+            // must ensure all pending writes are performed before calling 
ccb.writeComplete()
+            if (sc.isPendingWrite()) {
+                pendingWriteCompletion = true;
+                return false;
+            }
             if (ccb != null) {
                 ccb.writeComplete();
                 ccb = null;
             }
+            pendingWriteCompletion = false;
             return true;
         }
 
@@ -223,7 +235,7 @@
     }
 
     void driveWriterStateMachine() throws IOException, NetException {
-        SocketChannel sc = tcpConnection.getSocketChannel();
+        ISocketChannel sc = tcpConnection.getSocketChannel();
         if (writerState.writePending()) {
             if (!writerState.performPendingWrite(sc)) {
                 return;
@@ -339,9 +351,9 @@
     }
 
     void driveReaderStateMachine() throws IOException, NetException {
-        SocketChannel sc = tcpConnection.getSocketChannel();
+        ISocketChannel sc = tcpConnection.getSocketChannel();
         int chunksRead = 0;
-        while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE) {
+        while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE || sc.isPendingRead()) {
             if (readerState.readBuffer.remaining() > 0) {
                 int read = sc.read(readerState.readBuffer);
                 if (read < 0) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 4ee7e83..7a08c42 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -25,6 +25,7 @@
 
 import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
 import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener;
 import org.apache.hyracks.net.protocols.tcp.TCPConnection;
 import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
@@ -68,9 +69,12 @@
      *            - Number of threads to use for data transfer
      * @param maxConnectionAttempts
      *            - Maximum number of connection attempts
+     * @param socketChannelFactory
+     *            - The socket channel factor
      */
     public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener 
listener, int nThreads,
-            int maxConnectionAttempts, IChannelInterfaceFactory 
channelInterfaceFatory) {
+            int maxConnectionAttempts, IChannelInterfaceFactory 
channelInterfaceFatory,
+            ISocketChannelFactory socketChannelFactory) {
         this.localAddress = localAddress;
         this.channelOpenListener = listener;
         this.maxConnectionAttempts = maxConnectionAttempts;
@@ -126,7 +130,7 @@
                     }
                 }
             }
-        }, nThreads);
+        }, nThreads, socketChannelFactory);
         perfCounters = new MuxDemuxPerformanceCounters();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index ff4627a..1814edb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -22,8 +22,8 @@
 import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
 
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -38,7 +38,7 @@
 
     private final TCPEndpoint endpoint;
 
-    private final SocketChannel channel;
+    private final ISocketChannel channel;
     private final InetSocketAddress remoteAddress;
     private final SelectionKey key;
 
@@ -50,26 +50,26 @@
 
     private ConnectionType type;
 
-    public TCPConnection(TCPEndpoint endpoint, SocketChannel channel, 
SelectionKey key, Selector selector,
+    public TCPConnection(TCPEndpoint endpoint, ISocketChannel channel, 
SelectionKey key, Selector selector,
             ConnectionType type) {
         this.endpoint = endpoint;
         this.channel = channel;
         this.key = key;
         this.selector = selector;
         this.type = type;
-        remoteAddress = (InetSocketAddress) 
channel.socket().getRemoteSocketAddress();
+        remoteAddress = (InetSocketAddress) 
channel.getSocketChannel().socket().getRemoteSocketAddress();
     }
 
     public TCPEndpoint getEndpoint() {
         return endpoint;
     }
 
-    public SocketChannel getSocketChannel() {
+    public ISocketChannel getSocketChannel() {
         return channel;
     }
 
     public InetSocketAddress getLocalAddress() {
-        return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+        return (InetSocketAddress) 
channel.getSocketChannel().socket().getLocalSocketAddress();
     }
 
     public InetSocketAddress getRemoteAddress() {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 05e2175..d63078a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -19,6 +19,8 @@
 package org.apache.hyracks.net.protocols.tcp;
 
 import static 
org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+import static 
org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.INCOMING;
+import static 
org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.OUTGOING;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -31,7 +33,10 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
 import org.apache.hyracks.util.NetworkUtil;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -52,9 +57,13 @@
 
     private int nextThread;
 
-    public TCPEndpoint(ITCPConnectionListener connectionListener, int 
nThreads) {
+    private final ISocketChannelFactory socketChannelFactory;
+
+    public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads,
+            ISocketChannelFactory socketChannelFactory) {
         this.connectionListener = connectionListener;
         this.nThreads = nThreads;
+        this.socketChannelFactory = socketChannelFactory;
     }
 
     public void start(InetSocketAddress localAddress) throws IOException {
@@ -152,7 +161,7 @@
                                     key.attach(address);
                                 } else {
                                     SelectionKey key = 
channel.register(selector, 0);
-                                    createConnection(key, channel);
+                                    socketConnected(key, channel);
                                 }
                             }
                         }
@@ -161,12 +170,12 @@
                     if (!workingIncomingConnections.isEmpty()) {
                         for (SocketChannel channel : 
workingIncomingConnections) {
                             register(channel);
-                            SelectionKey sKey = channel.register(selector, 0);
-                            TCPConnection connection = new 
TCPConnection(TCPEndpoint.this, channel, sKey, selector,
-                                    ConnectionType.INCOMING);
-                            sKey.attach(connection);
-                            synchronized (connectionListener) {
-                                
connectionListener.acceptedConnection(connection);
+                            final SelectionKey sKey = 
channel.register(selector, 0);
+                            ISocketChannel socketChannel = 
socketChannelFactory.createServerChannel(channel);
+                            if (socketChannel.requiresHandshake()) {
+                                asyncHandshake(socketChannel, sKey, INCOMING);
+                            } else {
+                                connectionAccepted(socketChannel, sKey);
                             }
                         }
                         workingIncomingConnections.clear();
@@ -211,7 +220,7 @@
                                     }
                                 }
                                 if (finishConnect) {
-                                    createConnection(key, channel);
+                                    socketConnected(key, channel);
                                 }
                             }
                         }
@@ -222,14 +231,13 @@
             }
         }
 
-        private void createConnection(SelectionKey key, SocketChannel channel) 
{
-            TCPConnection connection =
-                    new TCPConnection(TCPEndpoint.this, channel, key, 
selector, ConnectionType.OUTGOING);
+        private TCPConnection createConnection(ISocketChannel socketChannel, 
SelectionKey key,
+                ConnectionType connectionType) {
+            final TCPConnection connection =
+                    new TCPConnection(TCPEndpoint.this, socketChannel, key, 
selector, connectionType);
             key.attach(connection);
             key.interestOps(0);
-            synchronized (connectionListener) {
-                connectionListener.connectionEstablished(connection);
-            }
+            return connection;
         }
 
         synchronized void initiateConnection(InetSocketAddress remoteAddress) {
@@ -262,5 +270,56 @@
             NetworkUtil.configure(channel);
             channel.configureBlocking(false);
         }
+
+        private void socketConnected(SelectionKey key, SocketChannel channel) {
+            ISocketChannel socketChannel = 
socketChannelFactory.createClientChannel(channel);
+            if (socketChannel.requiresHandshake()) {
+                asyncHandshake(socketChannel, key, OUTGOING);
+            } else {
+                connectionEstablished(socketChannel, key);
+            }
+        }
+
+        private void asyncHandshake(ISocketChannel socketChannel, SelectionKey 
key, ConnectionType connectionType) {
+            
CompletableFuture.supplyAsync(socketChannel::handshake).exceptionally(ex -> 
false)
+                    .thenAccept(handshakeSuccess -> 
handleHandshakeCompletion(handshakeSuccess, socketChannel, key,
+                            connectionType));
+        }
+
+        private void handleHandshakeCompletion(Boolean handshakeSuccess, 
ISocketChannel socketChannel, SelectionKey key,
+                ConnectionType connectionType) {
+            if (handshakeSuccess) {
+                if (connectionType == OUTGOING) {
+                    connectionEstablished(socketChannel, key);
+                } else if (connectionType == INCOMING) {
+                    connectionAccepted(socketChannel, key);
+                }
+            } else {
+                handleHandshakeFailure(socketChannel, key);
+            }
+        }
+
+        private void handleHandshakeFailure(ISocketChannel socketChannel, 
SelectionKey key) {
+            key.cancel();
+            NetworkUtil.closeQuietly(socketChannel);
+            synchronized (connectionListener) {
+                connectionListener.connectionFailure((InetSocketAddress) 
key.attachment(),
+                        new IOException("handshake failure"));
+            }
+        }
+
+        private void connectionEstablished(ISocketChannel socketChannel, 
SelectionKey key) {
+            TCPConnection connection = createConnection(socketChannel, key, 
OUTGOING);
+            synchronized (connectionListener) {
+                connectionListener.connectionEstablished(connection);
+            }
+        }
+
+        private void connectionAccepted(ISocketChannel socketChannel, 
SelectionKey key) {
+            TCPConnection connection = createConnection(socketChannel, key, 
INCOMING);
+            synchronized (connectionListener) {
+                connectionListener.acceptedConnection(connection);
+            }
+        }
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
index f9a610c..6d9e7c2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
@@ -28,6 +28,7 @@
 import org.apache.hyracks.api.comm.IBufferFactory;
 import org.apache.hyracks.api.comm.IChannelControlBlock;
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.network.ISocketChannel;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelReadInterface;
 import org.apache.hyracks.util.StorageUtil;
@@ -61,7 +62,7 @@
         readInterface.setFullBufferAcceptor(new 
ReadFullBufferAcceptor(fullBufferQ));
         readInterface.setBufferFactory(bufferFactory, RECEIVER_BUFFER_COUNT, 
FRAME_SIZE);
         Assert.assertEquals(EXPECTED_CHANNEL_CREDIT, channelCredit.get());
-        final SocketChannel socketChannel = mockSocketChannel(ccb);
+        final ISocketChannel socketChannel = mockSocketChannel(ccb);
         final Thread networkFrameReader = new Thread(() -> {
             try {
                 int framesRead = FRAMES_TO_READ_COUNT;
@@ -124,8 +125,8 @@
         return ccb;
     }
 
-    private SocketChannel mockSocketChannel(IChannelControlBlock ccb) throws 
IOException {
-        final SocketChannel sc = Mockito.mock(SocketChannel.class);
+    private ISocketChannel mockSocketChannel(IChannelControlBlock ccb) throws 
IOException {
+        final ISocketChannel sc = Mockito.mock(ISocketChannel.class);
         
Mockito.when(sc.read(Mockito.any(ByteBuffer.class))).thenAnswer(invocation -> {
             ccb.addPendingCredits(-FRAME_SIZE);
             final ByteBuffer buffer = invocation.getArgumentAt(0, 
ByteBuffer.class);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
index 8f582ba..ec1d21c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
@@ -27,6 +27,7 @@
 
 import org.apache.hyracks.api.comm.IBufferAcceptor;
 import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
 import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
 import 
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
 import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -162,7 +163,7 @@
             }
         };
         return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), 
md1OpenListener, 1, 5,
-                FullFrameChannelInterfaceFactory.INSTANCE);
+                FullFrameChannelInterfaceFactory.INSTANCE, 
PlainSocketChannelFactory.INSTANCE);
     }
 
     private class ChannelIO {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/3071
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>

Reply via email to