Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/3071
Change subject: [ASTERIXDB-2490][NET] Support Encrypted Multiplex Connections
......................................................................
[ASTERIXDB-2490][NET] Support Encrypted Multiplex Connections
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Use SocketChannelFactory in multiplex connections
to support both unencrypted and encrypted sockets.
- Adapt TCPEndpoint to socket channels that require
handshake.
- Adapt test cases to API changes.
Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
M
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
M
hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
M hyracks-fullstack/hyracks/hyracks-net/pom.xml
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
M
hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
M
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
M
hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
17 files changed, 142 insertions(+), 53 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/71/3071/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
index 42ec795..f69c102 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/MessagingChannelReadInterface.java
@@ -19,10 +19,10 @@
package org.apache.asterix.messaging;
import java.io.IOException;
-import java.nio.channels.SocketChannel;
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.muxdemux.AbstractChannelReadInterface;
public class MessagingChannelReadInterface extends
AbstractChannelReadInterface {
@@ -32,7 +32,7 @@
}
@Override
- public int read(SocketChannel sc, int size) throws IOException,
NetException {
+ public int read(ISocketChannel sc, int size) throws IOException,
NetException {
while (true) {
if (size <= 0) {
return size;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
index 357d761..4429219 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IChannelReadInterface.java
@@ -19,9 +19,9 @@
package org.apache.hyracks.api.comm;
import java.io.IOException;
-import java.nio.channels.SocketChannel;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
/**
* Represents the read interface of a {@link IChannelControlBlock}.
@@ -68,7 +68,7 @@
* @throws IOException
* @throws NetException
*/
- public int read(SocketChannel sc, int size) throws IOException,
NetException;
+ public int read(ISocketChannel sc, int size) throws IOException,
NetException;
/**
* Sets the read credits of this {@link IChannelReadInterface}
diff --git
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
index c238ae3..5fe6ecb 100644
---
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/net/ClientNetworkManager.java
@@ -23,6 +23,7 @@
import java.net.SocketAddress;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
@@ -35,11 +36,12 @@
private final MuxDemux md;
- public ClientNetworkManager(int nThreads) throws IOException {
+ public ClientNetworkManager(int nThreads, ISocketChannelFactory
socketChannelFactory) {
/* This is a connect only socket and does not listen to any incoming
connections, so pass null to
* localAddress and listener.
*/
- md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS,
FullFrameChannelInterfaceFactory.INSTANCE);
+ md = new MuxDemux(null, null, nThreads, MAX_CONNECTION_ATTEMPTS,
FullFrameChannelInterfaceFactory.INSTANCE,
+ socketChannelFactory);
}
public void start() throws IOException {
diff --git
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
index a72573c..4d8767f 100644
---
a/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
+++
b/hyracks-fullstack/hyracks/hyracks-client/src/main/java/org/apache/hyracks/client/result/ResultSet.java
@@ -44,7 +44,7 @@
NetworkAddress ddsAddress = hcc.getResultDirectoryAddress();
resultDirectory = new ResultDirectory(ddsAddress.getAddress(),
ddsAddress.getPort(), socketChannelFactory);
- netManager = new ClientNetworkManager(nReaders);
+ netManager = new ClientNetworkManager(nReaders, socketChannelFactory);
netManager.start();
resultClientCtx = new ResultClientContext(frameSize);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index a92fcb6..317d59a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -273,12 +273,12 @@
resultNetworkManager = new
ResultNetworkManager(ncConfig.getResultListenAddress(),
ncConfig.getResultListenPort(), resultPartitionManager,
ncConfig.getNetThreadCount(),
ncConfig.getNetBufferCount(),
ncConfig.getResultPublicAddress(), ncConfig.getResultPublicPort(),
- FullFrameChannelInterfaceFactory.INSTANCE);
+ FullFrameChannelInterfaceFactory.INSTANCE,
networkSecurityManager.getSocketChannelFactory());
if (ncConfig.getMessagingListenAddress() != null &&
serviceCtx.getMessagingChannelInterfaceFactory() != null) {
messagingNetManager = new MessagingNetworkManager(this,
ncConfig.getMessagingListenAddress(),
ncConfig.getMessagingListenPort(),
ncConfig.getNetThreadCount(),
ncConfig.getMessagingPublicAddress(),
ncConfig.getMessagingPublicPort(),
- serviceCtx.getMessagingChannelInterfaceFactory());
+ serviceCtx.getMessagingChannelInterfaceFactory(),
networkSecurityManager.getSocketChannelFactory());
}
}
@@ -292,7 +292,8 @@
partitionManager = new PartitionManager(this);
netManager = new NetworkManager(ncConfig.getDataListenAddress(),
ncConfig.getDataListenPort(), partitionManager,
ncConfig.getNetThreadCount(), ncConfig.getNetBufferCount(),
ncConfig.getDataPublicAddress(),
- ncConfig.getDataPublicPort(),
FullFrameChannelInterfaceFactory.INSTANCE);
+ ncConfig.getDataPublicPort(),
FullFrameChannelInterfaceFactory.INSTANCE,
+ networkSecurityManager.getSocketChannelFactory());
netManager.start();
startApplication();
init();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
index a37d131..4c7270c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/MessagingNetworkManager.java
@@ -32,6 +32,7 @@
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -53,10 +54,11 @@
private final Map<IChannelControlBlock, ICloseableBufferAcceptor>
channelFullBufferAcceptor = new HashMap<>();
public MessagingNetworkManager(NodeControllerService ncs, String
inetAddress, int inetPort, int nThreads,
- String publicInetAddress, int publicInetPort,
IChannelInterfaceFactory channelInterfaceFactory) {
+ String publicInetAddress, int publicInetPort,
IChannelInterfaceFactory channelInterfaceFactory,
+ ISocketChannelFactory socketChannelFactory) {
this.ncs = ncs;
md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new
ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+ MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory,
socketChannelFactory);
publicNetworkAddress = new NetworkAddress(publicInetAddress,
publicInetPort);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
index 3298b78..6876618 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
import org.apache.hyracks.comm.channels.NetworkOutputChannel;
@@ -60,11 +61,11 @@
public NetworkManager(String inetAddress, int inetPort, PartitionManager
partitionManager, int nThreads,
int nBuffers, String publicInetAddress, int publicInetPort,
- IChannelInterfaceFactory channelInterfaceFactory) {
+ IChannelInterfaceFactory channelInterfaceFactory,
ISocketChannelFactory socketChannelFactory) {
this.partitionManager = partitionManager;
this.nBuffers = nBuffers;
md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new
ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+ MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory,
socketChannelFactory);
// Just save these values for the moment; may be reset in start()
publicNetworkAddress = new NetworkAddress(publicInetAddress,
publicInetPort);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
index ee821d6..fe8f2af 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/ResultNetworkManager.java
@@ -29,6 +29,7 @@
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.exceptions.NetException;
import org.apache.hyracks.api.job.JobId;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.api.result.IResultPartitionManager;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.comm.channels.IChannelConnectionFactory;
@@ -73,11 +74,11 @@
*/
public ResultNetworkManager(String inetAddress, int inetPort,
IResultPartitionManager partitionManager,
int nThreads, int nBuffers, String publicInetAddress, int
publicInetPort,
- IChannelInterfaceFactory channelInterfaceFactory) {
+ IChannelInterfaceFactory channelInterfaceFactory,
ISocketChannelFactory socketChannelFactory) {
this.partitionManager = partitionManager;
this.nBuffers = nBuffers;
md = new MuxDemux(new InetSocketAddress(inetAddress, inetPort), new
ChannelOpenListener(), nThreads,
- MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory);
+ MAX_CONNECTION_ATTEMPTS, channelInterfaceFactory,
socketChannelFactory);
// Just save these values for the moment; may be reset in start()
publicNetworkAddress = new NetworkAddress(publicInetAddress,
publicInetPort);
}
diff --git a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
index 2cddf45..2dca39b 100644
--- a/hyracks-fullstack/hyracks/hyracks-net/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-net/pom.xml
@@ -47,6 +47,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-ipc</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
index 3a35212..0a26097 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hyracks.api.comm.IChannelControlBlock;
@@ -28,6 +27,7 @@
import org.apache.hyracks.api.comm.IChannelReadInterface;
import org.apache.hyracks.api.comm.IChannelWriteInterface;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import
org.apache.hyracks.net.protocols.muxdemux.MultiplexedConnection.WriterState;
import org.apache.hyracks.util.JSONUtil;
@@ -91,7 +91,7 @@
wi.writeComplete();
}
- synchronized int read(SocketChannel sc, int size) throws IOException,
NetException {
+ synchronized int read(ISocketChannel sc, int size) throws IOException,
NetException {
return ri.read(sc, size);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
index 3ba8627..ab6dbf1 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/FullFrameChannelReadInterface.java
@@ -20,13 +20,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,7 +52,7 @@
}
@Override
- public int read(SocketChannel sc, int size) throws IOException,
NetException {
+ public int read(ISocketChannel sc, int size) throws IOException,
NetException {
while (true) {
if (size <= 0) {
return size;
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
index 96ccafb..f7c3826 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
@@ -22,7 +22,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
import java.util.BitSet;
import java.util.Optional;
@@ -31,6 +30,7 @@
import org.apache.hyracks.api.comm.IConnectionWriterState;
import org.apache.hyracks.api.comm.MuxDemuxCommand;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.tcp.ITCPConnectionEventListener;
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
import org.apache.hyracks.util.JSONUtil;
@@ -160,6 +160,8 @@
private IChannelControlBlock ccb;
+ private boolean pendingWriteCompletion = false;
+
public WriterState() {
cmdWriteBuffer =
ByteBuffer.allocateDirect(MuxDemuxCommand.COMMAND_SIZE);
cmdWriteBuffer.flip();
@@ -168,7 +170,8 @@
}
boolean writePending() {
- return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null &&
pendingWriteSize > 0);
+ return cmdWriteBuffer.remaining() > 0 || (pendingBuffer != null &&
pendingWriteSize > 0)
+ || pendingWriteCompletion;
}
@Override
@@ -181,7 +184,10 @@
this.ccb = ccb;
}
- boolean performPendingWrite(SocketChannel sc) throws IOException {
+ boolean performPendingWrite(ISocketChannel sc) throws IOException {
+ if (pendingWriteCompletion && !sc.completeWrite()) {
+ return false;
+ }
int len = cmdWriteBuffer.remaining();
if (len > 0) {
int written = sc.write(cmdWriteBuffer);
@@ -209,10 +215,16 @@
pendingBuffer = null;
pendingWriteSize = 0;
}
+ // must ensure all pending writes are performed before calling
ccb.writeComplete()
+ if (sc.isPendingWrite()) {
+ pendingWriteCompletion = true;
+ return false;
+ }
if (ccb != null) {
ccb.writeComplete();
ccb = null;
}
+ pendingWriteCompletion = false;
return true;
}
@@ -223,7 +235,7 @@
}
void driveWriterStateMachine() throws IOException, NetException {
- SocketChannel sc = tcpConnection.getSocketChannel();
+ ISocketChannel sc = tcpConnection.getSocketChannel();
if (writerState.writePending()) {
if (!writerState.performPendingWrite(sc)) {
return;
@@ -339,9 +351,9 @@
}
void driveReaderStateMachine() throws IOException, NetException {
- SocketChannel sc = tcpConnection.getSocketChannel();
+ ISocketChannel sc = tcpConnection.getSocketChannel();
int chunksRead = 0;
- while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE) {
+ while (chunksRead < MAX_CHUNKS_READ_PER_CYCLE || sc.isPendingRead()) {
if (readerState.readBuffer.remaining() > 0) {
int read = sc.read(readerState.readBuffer);
if (read < 0) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
index 4ee7e83..7a08c42 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.comm.IChannelInterfaceFactory;
import org.apache.hyracks.api.exceptions.NetException;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.net.protocols.tcp.ITCPConnectionListener;
import org.apache.hyracks.net.protocols.tcp.TCPConnection;
import org.apache.hyracks.net.protocols.tcp.TCPEndpoint;
@@ -68,9 +69,12 @@
* - Number of threads to use for data transfer
* @param maxConnectionAttempts
* - Maximum number of connection attempts
+ * @param socketChannelFactory
+ * - The socket channel factor
*/
public MuxDemux(InetSocketAddress localAddress, IChannelOpenListener
listener, int nThreads,
- int maxConnectionAttempts, IChannelInterfaceFactory
channelInterfaceFatory) {
+ int maxConnectionAttempts, IChannelInterfaceFactory
channelInterfaceFatory,
+ ISocketChannelFactory socketChannelFactory) {
this.localAddress = localAddress;
this.channelOpenListener = listener;
this.maxConnectionAttempts = maxConnectionAttempts;
@@ -126,7 +130,7 @@
}
}
}
- }, nThreads);
+ }, nThreads, socketChannelFactory);
perfCounters = new MuxDemuxPerformanceCounters();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
index ff4627a..1814edb 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPConnection.java
@@ -22,8 +22,8 @@
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -38,7 +38,7 @@
private final TCPEndpoint endpoint;
- private final SocketChannel channel;
+ private final ISocketChannel channel;
private final InetSocketAddress remoteAddress;
private final SelectionKey key;
@@ -50,26 +50,26 @@
private ConnectionType type;
- public TCPConnection(TCPEndpoint endpoint, SocketChannel channel,
SelectionKey key, Selector selector,
+ public TCPConnection(TCPEndpoint endpoint, ISocketChannel channel,
SelectionKey key, Selector selector,
ConnectionType type) {
this.endpoint = endpoint;
this.channel = channel;
this.key = key;
this.selector = selector;
this.type = type;
- remoteAddress = (InetSocketAddress)
channel.socket().getRemoteSocketAddress();
+ remoteAddress = (InetSocketAddress)
channel.getSocketChannel().socket().getRemoteSocketAddress();
}
public TCPEndpoint getEndpoint() {
return endpoint;
}
- public SocketChannel getSocketChannel() {
+ public ISocketChannel getSocketChannel() {
return channel;
}
public InetSocketAddress getLocalAddress() {
- return (InetSocketAddress) channel.socket().getLocalSocketAddress();
+ return (InetSocketAddress)
channel.getSocketChannel().socket().getLocalSocketAddress();
}
public InetSocketAddress getRemoteAddress() {
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
index 05e2175..d63078a 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.net.protocols.tcp;
import static
org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType;
+import static
org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.INCOMING;
+import static
org.apache.hyracks.net.protocols.tcp.TCPConnection.ConnectionType.OUTGOING;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -31,7 +33,10 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hyracks.api.network.ISocketChannel;
+import org.apache.hyracks.api.network.ISocketChannelFactory;
import org.apache.hyracks.util.NetworkUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,9 +57,13 @@
private int nextThread;
- public TCPEndpoint(ITCPConnectionListener connectionListener, int
nThreads) {
+ private final ISocketChannelFactory socketChannelFactory;
+
+ public TCPEndpoint(ITCPConnectionListener connectionListener, int nThreads,
+ ISocketChannelFactory socketChannelFactory) {
this.connectionListener = connectionListener;
this.nThreads = nThreads;
+ this.socketChannelFactory = socketChannelFactory;
}
public void start(InetSocketAddress localAddress) throws IOException {
@@ -152,7 +161,7 @@
key.attach(address);
} else {
SelectionKey key =
channel.register(selector, 0);
- createConnection(key, channel);
+ socketConnected(key, channel);
}
}
}
@@ -161,12 +170,12 @@
if (!workingIncomingConnections.isEmpty()) {
for (SocketChannel channel :
workingIncomingConnections) {
register(channel);
- SelectionKey sKey = channel.register(selector, 0);
- TCPConnection connection = new
TCPConnection(TCPEndpoint.this, channel, sKey, selector,
- ConnectionType.INCOMING);
- sKey.attach(connection);
- synchronized (connectionListener) {
-
connectionListener.acceptedConnection(connection);
+ final SelectionKey sKey =
channel.register(selector, 0);
+ ISocketChannel socketChannel =
socketChannelFactory.createServerChannel(channel);
+ if (socketChannel.requiresHandshake()) {
+ asyncHandshake(socketChannel, sKey, INCOMING);
+ } else {
+ connectionAccepted(socketChannel, sKey);
}
}
workingIncomingConnections.clear();
@@ -211,7 +220,7 @@
}
}
if (finishConnect) {
- createConnection(key, channel);
+ socketConnected(key, channel);
}
}
}
@@ -222,14 +231,13 @@
}
}
- private void createConnection(SelectionKey key, SocketChannel channel)
{
- TCPConnection connection =
- new TCPConnection(TCPEndpoint.this, channel, key,
selector, ConnectionType.OUTGOING);
+ private TCPConnection createConnection(ISocketChannel socketChannel,
SelectionKey key,
+ ConnectionType connectionType) {
+ final TCPConnection connection =
+ new TCPConnection(TCPEndpoint.this, socketChannel, key,
selector, connectionType);
key.attach(connection);
key.interestOps(0);
- synchronized (connectionListener) {
- connectionListener.connectionEstablished(connection);
- }
+ return connection;
}
synchronized void initiateConnection(InetSocketAddress remoteAddress) {
@@ -262,5 +270,56 @@
NetworkUtil.configure(channel);
channel.configureBlocking(false);
}
+
+ private void socketConnected(SelectionKey key, SocketChannel channel) {
+ ISocketChannel socketChannel =
socketChannelFactory.createClientChannel(channel);
+ if (socketChannel.requiresHandshake()) {
+ asyncHandshake(socketChannel, key, OUTGOING);
+ } else {
+ connectionEstablished(socketChannel, key);
+ }
+ }
+
+ private void asyncHandshake(ISocketChannel socketChannel, SelectionKey
key, ConnectionType connectionType) {
+
CompletableFuture.supplyAsync(socketChannel::handshake).exceptionally(ex ->
false)
+ .thenAccept(handshakeSuccess ->
handleHandshakeCompletion(handshakeSuccess, socketChannel, key,
+ connectionType));
+ }
+
+ private void handleHandshakeCompletion(Boolean handshakeSuccess,
ISocketChannel socketChannel, SelectionKey key,
+ ConnectionType connectionType) {
+ if (handshakeSuccess) {
+ if (connectionType == OUTGOING) {
+ connectionEstablished(socketChannel, key);
+ } else if (connectionType == INCOMING) {
+ connectionAccepted(socketChannel, key);
+ }
+ } else {
+ handleHandshakeFailure(socketChannel, key);
+ }
+ }
+
+ private void handleHandshakeFailure(ISocketChannel socketChannel,
SelectionKey key) {
+ key.cancel();
+ NetworkUtil.closeQuietly(socketChannel);
+ synchronized (connectionListener) {
+ connectionListener.connectionFailure((InetSocketAddress)
key.attachment(),
+ new IOException("handshake failure"));
+ }
+ }
+
+ private void connectionEstablished(ISocketChannel socketChannel,
SelectionKey key) {
+ TCPConnection connection = createConnection(socketChannel, key,
OUTGOING);
+ synchronized (connectionListener) {
+ connectionListener.connectionEstablished(connection);
+ }
+ }
+
+ private void connectionAccepted(ISocketChannel socketChannel,
SelectionKey key) {
+ TCPConnection connection = createConnection(socketChannel, key,
INCOMING);
+ synchronized (connectionListener) {
+ connectionListener.acceptedConnection(connection);
+ }
+ }
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
index f9a610c..6d9e7c2 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/FullFrameChannelReadInterfaceTest.java
@@ -28,6 +28,7 @@
import org.apache.hyracks.api.comm.IBufferFactory;
import org.apache.hyracks.api.comm.IChannelControlBlock;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.api.network.ISocketChannel;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelReadInterface;
import org.apache.hyracks.util.StorageUtil;
@@ -61,7 +62,7 @@
readInterface.setFullBufferAcceptor(new
ReadFullBufferAcceptor(fullBufferQ));
readInterface.setBufferFactory(bufferFactory, RECEIVER_BUFFER_COUNT,
FRAME_SIZE);
Assert.assertEquals(EXPECTED_CHANNEL_CREDIT, channelCredit.get());
- final SocketChannel socketChannel = mockSocketChannel(ccb);
+ final ISocketChannel socketChannel = mockSocketChannel(ccb);
final Thread networkFrameReader = new Thread(() -> {
try {
int framesRead = FRAMES_TO_READ_COUNT;
@@ -124,8 +125,8 @@
return ccb;
}
- private SocketChannel mockSocketChannel(IChannelControlBlock ccb) throws
IOException {
- final SocketChannel sc = Mockito.mock(SocketChannel.class);
+ private ISocketChannel mockSocketChannel(IChannelControlBlock ccb) throws
IOException {
+ final ISocketChannel sc = Mockito.mock(ISocketChannel.class);
Mockito.when(sc.read(Mockito.any(ByteBuffer.class))).thenAnswer(invocation -> {
ccb.addPendingCredits(-FRAME_SIZE);
final ByteBuffer buffer = invocation.getArgumentAt(0,
ByteBuffer.class);
diff --git
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
index 8f582ba..ec1d21c 100644
---
a/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-net/src/test/java/org/apache/hyracks/net/tests/NetTest.java
@@ -27,6 +27,7 @@
import org.apache.hyracks.api.comm.IBufferAcceptor;
import org.apache.hyracks.api.comm.ICloseableBufferAcceptor;
+import org.apache.hyracks.ipc.sockets.PlainSocketChannelFactory;
import org.apache.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import
org.apache.hyracks.net.protocols.muxdemux.FullFrameChannelInterfaceFactory;
import org.apache.hyracks.net.protocols.muxdemux.IChannelOpenListener;
@@ -162,7 +163,7 @@
}
};
return new MuxDemux(new InetSocketAddress("127.0.0.1", 0),
md1OpenListener, 1, 5,
- FullFrameChannelInterfaceFactory.INSTANCE);
+ FullFrameChannelInterfaceFactory.INSTANCE,
PlainSocketChannelFactory.INSTANCE);
}
private class ChannelIO {
--
To view, visit https://asterix-gerrit.ics.uci.edu/3071
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I9cbed93c162018bad17923d50d4987011cbba16c
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>