dschneider-pivotal commented on a change in pull request #6156:
URL: https://github.com/apache/geode/pull/6156#discussion_r650281404



##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
##########
@@ -882,6 +1000,12 @@ void readPayloadFields(final int numParts, final int len) 
throws IOException {
     int readSecurePart = checkAndSetSecurityPart();
 
     int bytesRemaining = len;
+
+    if (this.ioFilter != null) {

Review comment:
       In some cases we have a filter even if not using SSL, NioPlainEngine. 
What is the difference between ioFilter being null and it being an instance of 
NioPlainEngine? Would it be possible to have it never be null? If so, it would 
be nice to get rid of all these null checks

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
##########
@@ -415,5 +417,29 @@ public void write(ByteBufferInputStream.ByteSource source) 
{
     source.sendTo(this.buffer);
   }
 
+  private void flushBuffer(SocketChannel sc, ByteBuffer out, NioFilter 
ioFilter)
+      throws IOException {
+    if (out.position() == 0)
+      return;
+    out.flip();
+    writeToSC(sc, out, ioFilter);
+    out.clear();
+  }
+
+  private void writeToSC(SocketChannel sc, ByteBuffer out, NioFilter ioFilter)

Review comment:
       rename to "writeToSocketChannel"

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -1455,22 +1478,24 @@ private void handleNewClientConnection(final Socket 
socket) throws IOException {
             "Rejected connection from {} because current connection count of 
{} is greater than or equal to the configured max of {}",
             new Object[] {socket.getInetAddress(), curCnt,
                 maxConnections});
+
         try {
           refuseHandshake(socket.getOutputStream(),
               String.format("exceeded max-connections %s",
                   maxConnections),
-              REPLY_REFUSED);
+              REPLY_REFUSED, ioFilter, socket);

Review comment:
       since you are now passing "socket" into refuseHandshake, you no longer 
need to also pass in socket.getOutputStream()

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -967,7 +981,8 @@ private void checkForStuckKeys() {
             tmpSel.selectNow(); // clear canceled key
             sc.registerWithSelector2(selector);
           } else {
-            if (tmpsk.isValid() && tmpsk.isReadable()) {
+            if (tmpsk.isValid() && (tmpsk.isReadable() || tmpsk.isWritable())) 
{

Review comment:
       Why did you add isWritable? I think the old code only used non-blocking 
on reads. Are you also now doing non-blocking writes?
   Note this common expression (line 984 and 1129) could be extracted into a 
method

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
##########
@@ -415,5 +417,29 @@ public void write(ByteBufferInputStream.ByteSource source) 
{
     source.sendTo(this.buffer);
   }
 
+  private void flushBuffer(SocketChannel sc, ByteBuffer out, NioFilter 
ioFilter)

Review comment:
       the old code called flushBuffer on the parent class 
BufferDataOutputStream. Would it be possible to add the ioFilter to that 
method? It looks to me like all callers are in HeapDataOutputStream so making 
it a private method in this class is good and you could just remove the unused 
one in BufferDataOutputStream

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java
##########
@@ -47,51 +49,67 @@
   private KnownVersion clientVersion;
   private DataInputStream dataInputStream;
   private DataOutputStream dataOutputStream;
+  private final NioFilter ioFilter;
 
-  ClientRegistrationMetadata(final InternalCache cache, final Socket socket) {
+  ClientRegistrationMetadata(final InternalCache cache, final Socket socket,
+      final NioFilter ioFilter) {
     this.cache = cache;
     this.socket = socket;
     socketMessageWriter = new SocketMessageWriter();
+    this.ioFilter = ioFilter;
   }
 
   boolean initialize() throws IOException {
-    DataInputStream unversionedDataInputStream = new 
DataInputStream(socket.getInputStream());
-    DataOutputStream unversionedDataOutputStream = new 
DataOutputStream(socket.getOutputStream());
-
-    if (getAndValidateClientVersion(socket, unversionedDataInputStream,
-        unversionedDataOutputStream)) {
-      if (oldClientRequiresVersionedStreams(clientVersion)) {
-        dataInputStream =
-            new VersionedDataInputStream(unversionedDataInputStream, 
clientVersion);
-        dataOutputStream =
-            new VersionedDataOutputStream(unversionedDataOutputStream, 
clientVersion);
+    InputStream inputStream = null;
+    try {
+      if (ioFilter == null) {
+        inputStream = socket.getInputStream();
       } else {
-        dataInputStream = unversionedDataInputStream;
-        dataOutputStream = unversionedDataOutputStream;
+        inputStream = ioFilter.getInputStream(socket);

Review comment:
       shouldn't the initialization of inputStream happen right before the try 
(instead of after) since all the finally block does is close the inputStream? 
Also if you then extracted these lines of code into a method you could then 
have: final InputStream inputStream = getInputStream(socket, ioFilter); try ...

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
##########
@@ -1424,10 +1439,18 @@ private void handleNewClientConnection(final Socket 
socket) throws IOException {
     // for 'server to client' communication, send it to the CacheClientNotifier
     // for processing.
     final CommunicationMode communicationMode;
+    final NioFilter ioFilter;
     try {
       if (isSelector()) {
-        communicationMode = getCommunicationModeForSelector(socket);
+        if (socketCreator.forCluster().useSSL()) {
+          ioFilter = createNIOSSLEngine(socket.getChannel());
+          communicationMode = getCommunicationModeForSSLSelector(socket, 
ioFilter);
+        } else {
+          ioFilter = new NioPlainEngine(bufferPool);
+          communicationMode = getCommunicationModeForSelector(socket);

Review comment:
       would calling getCommunicationModeForSSLSelector with our NioPlainEngine 
ioFilter work here? If so we could then get rid of the old 
getCommunicationModeForSelector and just have the one that takes both a socket 
and an ioFilter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to