zstan commented on code in PR #11036:
URL: https://github.com/apache/ignite/pull/11036#discussion_r1395283977


##########
modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/TcpHandshakeExecutor.java:
##########
@@ -67,228 +68,291 @@ public TcpHandshakeExecutor(IgniteLogger log, 
ClusterStateProvider stateProvider
      * @param ch Socket channel which using for handshake.
      * @param rmtNodeId Expected remote node.
      * @param sslMeta Required data for ssl.
-     * @param msg Handshake message which should be send during handshake.
+     * @param msg Handshake message which should be sent during handshake.
      * @return Handshake response from predefined variants from {@link 
RecoveryLastReceivedMessage}.
-     * @throws IgniteCheckedException If not related to IO exception happened.
-     * @throws IOException If reading or writing to socket is failed.
+     * @throws IgniteCheckedException If handshake failed.
      */
     public long tcpHandshake(
         SocketChannel ch,
         UUID rmtNodeId,
         GridSslMeta sslMeta,
         HandshakeMessage msg
-    ) throws IgniteCheckedException, IOException {
-        long rcvCnt;
+    ) throws IgniteCheckedException {
+        BlockingTransport transport = stateProvider.isSslEnabled() ?
+            new SslTransport(sslMeta, ch, directBuffer, log) : new 
TcpTransport(ch);
 
-        BlockingSslHandler sslHnd = null;
+        ByteBuffer buf = transport.recieveNodeId();
 
-        ByteBuffer buf;
+        if (buf == null)
+            return NEED_WAIT;
 
-        // Step 1. Get remote node response with the remote nodeId value.
-        if (stateProvider.isSslEnabled()) {
-            assert sslMeta != null;
-
-            sslHnd = new BlockingSslHandler(sslMeta.sslEngine(), ch, 
directBuffer, ByteOrder.LITTLE_ENDIAN, log);
-
-            if (!sslHnd.handshake())
-                throw new HandshakeException("SSL handshake is not 
completed.");
-
-            ByteBuffer handBuff = sslHnd.applicationBuffer();
-
-            if (handBuff.remaining() >= DIRECT_TYPE_SIZE) {
-                short msgType = makeMessageType(handBuff.get(0), 
handBuff.get(1));
-
-                if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
-                    return NEED_WAIT;
-            }
-
-            if (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) {
-                ByteBuffer readBuf = ByteBuffer.allocate(1000);
-
-                while (handBuff.remaining() < NodeIdMessage.MESSAGE_FULL_SIZE) 
{
-                    int read = ch.read(readBuf);
-
-                    if (read == -1)
-                        throw new HandshakeException("Failed to read remote 
node ID (connection closed).");
+        UUID rmtNodeId0 = U.bytesToUuid(buf.array(), DIRECT_TYPE_SIZE);
 
-                    readBuf.flip();
+        if (!rmtNodeId.equals(rmtNodeId0))
+            throw new HandshakeException("Remote node ID is not as expected 
[expected=" + rmtNodeId + ", rcvd=" + rmtNodeId0 + ']');
+        else if (log.isDebugEnabled())
+            log.debug("Received remote node ID: " + rmtNodeId0);
 
-                    sslHnd.decode(readBuf);
+        if (log.isDebugEnabled())
+            log.debug("Writing handshake message [rmtNode=" + rmtNodeId + ", 
msg=" + msg + ']');
 
-                    if (handBuff.remaining() >= DIRECT_TYPE_SIZE) {
-                        break;
-                    }
+        transport.sendHandshake(msg);
 
-                    readBuf.flip();
-                }
+        buf = transport.recieveAcknowledge();
 
-                buf = handBuff;
+        long rcvCnt = buf.getLong(DIRECT_TYPE_SIZE);
 
-                if (handBuff.remaining() >= DIRECT_TYPE_SIZE) {
-                    short msgType = makeMessageType(handBuff.get(0), 
handBuff.get(1));
+        if (log.isDebugEnabled())
+            log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", 
rcvCnt=" + rcvCnt + ']');
 
-                    if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
-                        return NEED_WAIT;
-                }
-            }
-            else
-                buf = handBuff;
+        if (rcvCnt == -1) {
+            if (log.isDebugEnabled())
+                log.debug("Connection rejected, will retry client creation 
[rmtNode=" + rmtNodeId + ']');
         }
-        else {
-            buf = ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE);
 
-            for (int i = 0; i < NodeIdMessage.MESSAGE_FULL_SIZE; ) {
-                int read = ch.read(buf);
+        transport.onHandshakeFinished(sslMeta);
+
+        return rcvCnt;
+    }
 
-                if (read == -1)
+    /**
+     * Encapsulates handshake logic.
+     */
+    private abstract static class BlockingTransport {
+        /**
+         * Receive {@link NodeIdMessage}.
+         *
+         * @return Buffer with {@link NodeIdMessage}.
+         * @throws IgniteCheckedException If failed.
+         */
+        ByteBuffer recieveNodeId() throws IgniteCheckedException {
+            ByteBuffer buf = 
ByteBuffer.allocate(NodeIdMessage.MESSAGE_FULL_SIZE)
+                    .order(ByteOrder.LITTLE_ENDIAN);
+
+            for (int totalBytes = 0; totalBytes < 
NodeIdMessage.MESSAGE_FULL_SIZE; ) {
+                int readBytes = read(buf);
+
+                if (readBytes == -1)
                     throw new HandshakeException("Failed to read remote node 
ID (connection closed).");
 
-                if (read >= DIRECT_TYPE_SIZE) {
+                if (readBytes >= DIRECT_TYPE_SIZE) {
                     short msgType = makeMessageType(buf.get(0), buf.get(1));
 
                     if (msgType == HANDSHAKE_WAIT_MSG_TYPE)
-                        return NEED_WAIT;
+                        return null;
                 }
 
-                i += read;
+                totalBytes += readBytes;
             }
-        }
 
-        UUID rmtNodeId0 = U.bytesToUuid(buf.array(), DIRECT_TYPE_SIZE);
-
-        if (!rmtNodeId.equals(rmtNodeId0))
-            throw new HandshakeException("Remote node ID is not as expected 
[expected=" + rmtNodeId +
-                ", rcvd=" + rmtNodeId0 + ']');
-        else if (log.isDebugEnabled())
-            log.debug("Received remote node ID: " + rmtNodeId0);
-
-        if (stateProvider.isSslEnabled()) {
-            assert sslHnd != null;
-
-            U.writeFully(ch, sslHnd.encrypt(ByteBuffer.wrap(U.IGNITE_HEADER)));
+            return buf;
         }
-        else
-            U.writeFully(ch, ByteBuffer.wrap(U.IGNITE_HEADER));
 
-        // Step 2. Prepare Handshake message to send to the remote node.
-        if (log.isDebugEnabled())
-            log.debug("Writing handshake message [rmtNode=" + rmtNodeId + ", 
msg=" + msg + ']');
-
-        buf = ByteBuffer.allocate(msg.getMessageSize());
-
-        buf.order(ByteOrder.LITTLE_ENDIAN);
+        /**
+         * Send {@link HandshakeMessage} to remote node.
+         *
+         * @param msg Handshake message.
+         * @throws IgniteCheckedException If failed.
+         */
+        void sendHandshake(HandshakeMessage msg) throws IgniteCheckedException 
{
+            ByteBuffer buf = ByteBuffer.allocate(msg.getMessageSize() + 
U.IGNITE_HEADER.length)
+                    .order(ByteOrder.LITTLE_ENDIAN)
+                    .put(U.IGNITE_HEADER);
+
+            msg.writeTo(buf, null);
+            buf.flip();
+
+            write(buf);
+        }
 
-        boolean written = msg.writeTo(buf, null);
+        /**
+         * Receive {@link RecoveryLastReceivedMessage} acknowledge message.
+         *
+         * @return Buffer with message.
+         * @throws IgniteCheckedException If failed.
+         */
+        ByteBuffer recieveAcknowledge() throws IgniteCheckedException {
+            ByteBuffer buf = 
ByteBuffer.allocate(RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE)
+                    .order(ByteOrder.LITTLE_ENDIAN);
 
-        assert written;
+            for (int totalBytes = 0; totalBytes < 
RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; ) {
+                int readBytes = read(buf);
 
-        buf.flip();
+                if (readBytes == -1)
+                    throw new HandshakeException("Failed to read remote node 
recovery handshake " +
+                            "(connection closed).");
 
-        if (stateProvider.isSslEnabled()) {
-            assert sslHnd != null;
+                totalBytes += readBytes;
+            }
 
-            U.writeFully(ch, sslHnd.encrypt(buf));
+            return buf;
         }
-        else
-            U.writeFully(ch, buf);
 
-        if (log.isDebugEnabled())
-            log.debug("Waiting for handshake [rmtNode=" + rmtNodeId + ']');
+        /**
+         * Read data from media.
+         *
+         * @param buf Buffer to read into.
+         * @return Bytes read.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract int read(ByteBuffer buf) throws IgniteCheckedException;
+
+        /**
+         * Write data fully.
+         * @param buf Buffer to write.
+         * @throws IgniteCheckedException If failed.
+         */
+        abstract void write(ByteBuffer buf) throws IgniteCheckedException;
+
+        /**
+         * Do some post-handshake job if needed.
+         *
+         * @param sslMeta Ssl meta.
+         */
+        void onHandshakeFinished(GridSslMeta sslMeta) {
+            // No-op.
+        }
+    }
 
-        // Step 3. Waiting for response from the remote node with their 
receive count message.
-        if (stateProvider.isSslEnabled()) {
-            assert sslHnd != null;
+    /**
+     * Tcp plaintext transport.
+     */
+    private static class TcpTransport extends BlockingTransport {
+        /** */
+        private final SocketChannel ch;
 
-            buf = ByteBuffer.allocate(1000);
-            buf.order(ByteOrder.LITTLE_ENDIAN);
+        /** */
+        TcpTransport(SocketChannel ch) {
+            this.ch = ch;
+        }
 
-            ByteBuffer decode = ByteBuffer.allocate(2 * buf.capacity());
-            decode.order(ByteOrder.LITTLE_ENDIAN);
+        /** {@inheritDoc} */
+        @Override int read(ByteBuffer buf) throws IgniteCheckedException {
+            try {
+                return ch.read(buf);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Failed to read from 
channel", e);
+            }
+        }
 
-            for (int i = 0; i < RecoveryLastReceivedMessage.MESSAGE_FULL_SIZE; 
) {
-                int read = ch.read(buf);
+        /** {@inheritDoc} */
+        @Override void write(ByteBuffer buf) throws IgniteCheckedException {
+            try {
+                U.writeFully(ch, buf);
+            }
+            catch (IOException e) {
+                throw new IgniteCheckedException("Failed to write to channel", 
e);
+            }
+        }
+    }
 
-                if (read == -1)
-                    throw new HandshakeException("Failed to read remote node 
recovery handshake " +
-                        "(connection closed).");
+    /** Ssl transport */
+    private static class SslTransport extends BlockingTransport {
+        /** */
+        private final BlockingSslHandler handler;
 
-                buf.flip();
+        /** */
+        private final SocketChannel ch;
 
-                ByteBuffer decode0 = sslHnd.decode(buf);
+        /** */
+        private final ByteBuffer readBuf;
 
-                i += decode0.remaining();
+        /** */
+        SslTransport(GridSslMeta meta, SocketChannel ch, boolean directBuf, 
IgniteLogger log) throws IgniteCheckedException {
+            try {
+                this.ch = ch;
+                handler = new BlockingSslHandler(meta.sslEngine(), ch, 
directBuf, ByteOrder.LITTLE_ENDIAN, log);
 
-                decode = appendAndResizeIfNeeded(decode, decode0);
+                if (!handler.handshake())
+                    throw new HandshakeException("SSL handshake is not 
completed.");
 
-                buf.clear();
+                readBuf = 
ByteBuffer.allocate(1024).order(ByteOrder.LITTLE_ENDIAN);

Review Comment:
   ```suggestion
                   readBuf = directBuf ? ByteBuffer.allocateDirect(1024) : 
ByteBuffer.allocate(1024);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to