echobravopapa commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r452410523



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -94,11 +96,14 @@
  *
  * @since GemFire 2.0
  */
-public class Connection implements Runnable {
+public class ClusterConnection implements Runnable {

Review comment:
       +1 on the rename

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java
##########
@@ -17,24 +17,31 @@
 
 import java.io.EOFException;

Review comment:
       should these classes be renamed since we are using OiO now?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2909,13 +2950,13 @@ private boolean readMessageHeader(ByteBuffer 
peerDataBuffer) throws IOException
       readerShuttingDown = true;
       requestClose(String.format("Unknown P2P message type: %s",
           nioMessageTypeInteger));
-      return true;
+      return false;

Review comment:
       not obvious why this bool is flipped...

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2588,23 +2596,44 @@ void writeFully(SocketChannel channel, ByteBuffer 
buffer, boolean forceAsync,
           }
           // fall through
         }
-        ByteBuffer wrappedBuffer = ioFilter.wrap(buffer);
-        while (wrappedBuffer.remaining() > 0) {
+        while (buffer.remaining() > 0) {
           int amtWritten = 0;
           long start = stats.startSocketWrite(true);
           try {
-            amtWritten = channel.write(wrappedBuffer);
+            if (socket instanceof SSLSocket) {
+              OutputStream output = socket.getOutputStream();
+              if (buffer.hasArray()) {
+                output.write(buffer.array(), buffer.arrayOffset(),
+                    buffer.limit() - buffer.position());
+                buffer.position(buffer.limit());
+              } else {
+                // socket output streams are FileOutputStreams and have a 
writeable Channel.
+                // This code merely fetches that channel and writes to it.
+                // Channels.newChannel(output).write(buffer);

Review comment:
       dead code...

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean 
preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), 
remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      // int socketBufferSize = -1;
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : 
this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), 
remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving 
responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), 
owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make 
small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since 
only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);
 
       } catch (NullPointerException e) {
         // jdk 1.7 sometimes throws an NPE here

Review comment:
       out of scope, but kinda reads like tech debt...

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
##########
@@ -253,7 +253,7 @@ ByteBuffer expandWriteBufferIfNeeded(BufferType type, 
ByteBuffer existing,
     return newBuffer;
   }
 
-  ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int capacity) {
+  public ByteBuffer acquireDirectBuffer(BufferPool.BufferType type, int 
capacity) {

Review comment:
       what required the addition of `public`?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
+    // BRUCE: simplify this

Review comment:
       do we need a story for this TODO?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -140,7 +145,7 @@
   private final ConnectionTable owner;
 
   private final TCPConduit conduit;
-  private NioFilter ioFilter;
+  // private NioFilter ioFilter;

Review comment:
       dead code...

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1155,47 @@ private Connection(ConnectionTable t, boolean 
preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), 
remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      // int socketBufferSize = -1;

Review comment:
       dead code...

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -2722,9 +2753,10 @@ public void readAck(final DirectReplyProcessor processor)
    * deserialized and passed to TCPConduit for further processing
    */
   private void processInputBuffer() throws ConnectionException, IOException {
+    // BRUCE: simplify this

Review comment:
       or a git-hook ;)

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -3006,7 +3047,7 @@ private void readMessage(ByteBuffer peerDataBuffer) {
       } catch (IOException ex) {
         // ignored
       }
-    } else /* (nioMessageType == END_CHUNKED_MSG_TYPE) */ {
+    } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {

Review comment:
       looks dead, maybe there was `else if` once upon a time...

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -202,6 +202,8 @@
 
   private final Stopper stopper = new Stopper();
 
+  private boolean enableTLSOverNIO = true; // 
Boolean.getBoolean("geode.enable-tls-nio");
+

Review comment:
       dead code hanging out...

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,66 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should 
probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;
 
 
-
-  MsgReader(Connection conn, NioFilter nioFilter, Version version) {
+  MsgReader(ClusterConnection conn, BufferPool bufferPool, InputStream 
inputStream,
+      Version version) {
+    this.bufferPool = bufferPool;
     this.conn = conn;
-    this.ioFilter = nioFilter;
+    this.inputStream = inputStream;
     this.byteBufferInputStream =
         version == null ? new ByteBufferInputStream() : new 
VersionedByteBufferInputStream(version);
   }
 
   Header readHeader() throws IOException {
-    ByteBuffer unwrappedBuffer = readAtLeast(Connection.MSG_HEADER_BYTES);
+    ByteBuffer buffer = readAtLeast(ClusterConnection.MSG_HEADER_BYTES);
 
-    Assert.assertTrue(unwrappedBuffer.remaining() >= 
Connection.MSG_HEADER_BYTES);
-
-    try {
-      int nioMessageLength = unwrappedBuffer.getInt();
-      /* nioMessageVersion = */
-      Connection.calcHdrVersion(nioMessageLength);
-      nioMessageLength = Connection.calcMsgByteSize(nioMessageLength);
-      byte nioMessageType = unwrappedBuffer.get();
-      short nioMsgId = unwrappedBuffer.getShort();
-
-      boolean directAck = (nioMessageType & Connection.DIRECT_ACK_BIT) != 0;
-      if (directAck) {
-        nioMessageType &= ~Connection.DIRECT_ACK_BIT; // clear the ack bit
-      }
+    Assert.assertTrue(buffer.remaining() >= 
ClusterConnection.MSG_HEADER_BYTES);
 
-      header.setFields(nioMessageLength, nioMessageType, nioMsgId);
+    int messageLength = buffer.getInt();
+    /* nioMessageVersion = */

Review comment:
       dead?
   

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
##########
@@ -945,6 +962,10 @@ public boolean useSSL() {
     return useSSL;
   }
 
+  public boolean useDirectReceiveBuffers() {
+    return !useSSL();
+  }
+

Review comment:
       previous question might be resolved following this accessor... for 
ssl/tls we use NIO and !ssl we use the directbuffers, cool and with 6 you get 
eggroll




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to