Bill commented on a change in pull request #5363:
URL: https://github.com/apache/geode/pull/5363#discussion_r454564779



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
##########
@@ -47,7 +48,10 @@
 /**
  * NioSslEngine uses an SSLEngine to bind SSL logic to a data source. This 
class is not thread
  * safe. Its use should be confined to one thread or should be protected by 
external
- * synchronization.
+ * synchronization.<br>
+ * While some NioSslEngine methods take a Socket as a parameter the given 
socket must hold
+ * a NIO Channel for i/o operations. If this is not the case these methods 
will likely throw
+ * a NullPointerException when they attempt to access and use the channel.

Review comment:
       thanks for the tip

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should 
probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;
+  private int lastReadPosition;

Review comment:
       ditto

##########
File path: geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java
##########
@@ -53,7 +53,7 @@ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer 
wrappedBuffer,
    * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, 
etc.);<br>
    * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.)
    */
-  ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer 
wrappedBuffer)
+  ByteBuffer readAtLeast(int amount, ByteBuffer wrappedBuffer, Socket socket)

Review comment:
       This PR eliminates the use of `NioFilter` (interface) and its two 
implementations: `NioPlainEngine`, `NioSslEngine` from Geode. Why were changes 
necessary to these classes and their tests?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean 
preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), 
remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : 
this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), 
remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);

Review comment:
       how do we know `alreadySetInSocket` should be `true` in this invocation?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) 
{
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is 
supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read 
available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit 
is zero
+   * and buffer.remaining is also zero the limit is changed to be 
buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }
+    return amountRead;
+  }
+
+  private static int readFromStream(InputStream stream, ByteBuffer 
inputBuffer) throws IOException {
+    int amountRead;
+    // if bytes are available we read that number of bytes. Otherwise we do a 
blocking read
+    // of buffer.remaining() bytes
+    int amountToRead = inputBuffer.remaining();
+    // stream.available() > 0 ? Math.min(stream.available(), 
inputBuffer.remaining())
+    // : inputBuffer.remaining();
+    if (inputBuffer.hasArray()) {
+      amountRead = stream.read(inputBuffer.array(),
+          inputBuffer.arrayOffset() + inputBuffer.position(), amountToRead);
+      if (amountRead > 0) {
+        inputBuffer.position(inputBuffer.position() + amountRead);
+      }
+    } else {

Review comment:
       under what conditions would `inputBuffer.hasArray()` be `true` vs 
`false`?
   
   what test coverage do we have of these conditions? (I just confirmed that 
the package-level unit tests for `org.apache.geode.internal.net` don't hit this 
method at all)

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/tcp/ClusterConnection.java
##########
@@ -1142,31 +1154,46 @@ private Connection(ConnectionTable t, boolean 
preserveOrder, InternalDistributed
 
     InetSocketAddress addr =
         new InetSocketAddress(remoteID.getInetAddress(), 
remoteID.getDirectChannelPort());
-    SocketChannel channel = SocketChannel.open();
-    owner.addConnectingSocket(channel.socket(), addr.getAddress());
-
-    try {
-      channel.socket().setTcpNoDelay(true);
-      channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
+    int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+    boolean useSSL = getConduit().useSSL();
+    if (useSSL) {
+      int socketBufferSize =
+          sharedResource ? SMALL_BUFFER_SIZE : 
this.owner.getConduit().tcpBufferSize;
+      socket = getConduit().getSocketCreator().forAdvancedUse().connect(
+          new HostAndPort(remoteID.getHostName(), 
remoteID.getDirectChannelPort()),
+          0, null, false, socketBufferSize, true);
+      setSocketBufferSize(this.socket, false, socketBufferSize, true);
+    } else {
+      SocketChannel channel = SocketChannel.open();
+      socket = channel.socket();
       // If conserve-sockets is false, the socket can be used for receiving 
responses, so set the
       // receive buffer accordingly.
       if (!sharedResource) {
-        setReceiveBufferSize(channel.socket(), 
owner.getConduit().tcpBufferSize);
+        setReceiveBufferSize(socket, owner.getConduit().tcpBufferSize);
       } else {
-        setReceiveBufferSize(channel.socket(), SMALL_BUFFER_SIZE); // make 
small since only
+        setReceiveBufferSize(socket, SMALL_BUFFER_SIZE); // make small since 
only
         // receive ack messages
       }
-      setSendBufferSize(channel.socket());
-      channel.configureBlocking(true);
+    }
+    owner.addConnectingSocket(socket, addr.getAddress());
+
+    try {
+      socket.setTcpNoDelay(true);
+      socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-      int connectTime = getP2PConnectTimeout(conduit.getDM().getConfig());
+      setSendBufferSize(socket);
+      if (!useSSL) {
+        socket.getChannel().configureBlocking(true);
+      }
 
       try {
 
-        channel.socket().connect(addr, connectTime);
-
-        createIoFilter(channel, true);
+        if (!useSSL) {
+          // haven't connected yet
+          socket.connect(addr, connectTime);
+        }
+        configureInputStream(socket, true);

Review comment:
       my read of `configureInputStream()` is that it initializes the 
`inputStream` field and does the TLS handshake if one is needed. That implies 
that a connect call happens in that case.
   
   inasmuch as `configureInputStream()` causes connect for the TLS case, would 
it make sense for it to also handle the connect for the non-TLS case too? just 
thinking of ways to reduce the number of branches on `getConduit().useSSL()`
   
   ugh but now I see `configureInputStream()` only initiates the TLS handshake 
if `!clientSocket`. Why does it only initiate the handshake for "server" 
sockets? 

##########
File path: geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
##########
@@ -14,70 +14,65 @@
  */
 package org.apache.geode.internal.tcp;
 
+import java.io.EOFException;
 import java.io.IOException;
-import java.nio.BufferUnderflowException;
+import java.io.InputStream;
+import java.net.Socket;
 import java.nio.ByteBuffer;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.net.BufferPool;
-import org.apache.geode.internal.net.NioFilter;
+import org.apache.geode.internal.net.SocketUtils;
 import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
  * This class is currently used for reading direct ack responses It should 
probably be used for all
  * of the reading done in Connection.
  *
  */
 public class MsgReader {
-  private static final Logger logger = LogService.getLogger();
-
-  protected final Connection conn;
+  protected final ClusterConnection conn;
+  private final BufferPool bufferPool;
   protected final Header header = new Header();
-  private final NioFilter ioFilter;
   private ByteBuffer peerNetData;
+  private final InputStream inputStream;
   private final ByteBufferInputStream byteBufferInputStream;
+  private int lastProcessedPosition;

Review comment:
       what does this variable mean? what are its invariants?

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) 
{
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is 
supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read 
available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit 
is zero
+   * and buffer.remaining is also zero the limit is changed to be 
buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */

Review comment:
       Update JavaDoc comment to explain the role of `socketInputStream`. 
   
   It appears that this input stream is an input stream constructed from the 
socket over in `ClusterConnection.configureInputStream()`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/net/SocketUtils.java
##########
@@ -73,4 +82,49 @@ public static boolean close(final ServerSocket serverSocket) 
{
     return true;
   }
 
+  /**
+   * Read data from the given socket into the given ByteBuffer. If NIO is 
supported
+   * we use Channel.read(ByteBuffer). If not we use byte arrays to read 
available
+   * bytes or buffer.remaining() bytes, whichever is smaller. If buffer.limit 
is zero
+   * and buffer.remaining is also zero the limit is changed to be 
buffer.capacity
+   * before reading.
+   *
+   * @return the number of bytes read, which may be -1 for EOF
+   */
+  public static int readFromSocket(Socket socket, ByteBuffer inputBuffer,
+      InputStream socketInputStream) throws IOException {
+    int amountRead;
+    inputBuffer.limit(inputBuffer.capacity());
+    if (socket instanceof SSLSocket) {
+      amountRead = readFromStream(socketInputStream, inputBuffer);
+    } else {
+      amountRead = socket.getChannel().read(inputBuffer);
+    }

Review comment:
       are we avoiding use of the `InputStream` in the non-TLS case, as a 
performance enhancement?
   
   would this method work correctly if the conditional was replaced by:
   
   ```java
          amountRead = readFromStream(socketInputStream, inputBuffer); 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to