sashapolo commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871562173


##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -28,29 +27,23 @@
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes handshake manager with the channel.
      *
-     * @param channel Channel.
-     * @return Action to perform by {@link HandshakeHandler}.
+     * @param handlerContext Channel handler context.
      */
-    HandshakeResult init(Channel channel);
+    void onInit(ChannelHandlerContext handlerContext);

Review Comment:
   why did you rename this method?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -75,40 +79,41 @@ public class ConnectionManager {
     /** Node consistent id. */
     private final String consistentId;
 
-    /** Client handshake manager factory. */
-    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
+    /** Node launch id. As opposed to {@link #consistentId}, this identifier 
changes between restarts. */
+    private final UUID launchId;
 
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
+    /** Recovery descriptor provider. */
+    private final RecoveryDescriptorProvider descriptorProvider = new 
DefaultRecoveryDescriptorProvider();

Review Comment:
   I would suggest to rename `RecoveryDescriptorProvider` to 
`RecoveryDescriptorFactory`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -284,6 +289,14 @@ public boolean isStopped() {
         return stopped.get();
     }
 
+    protected HandshakeManager createClientHandshakeManager(short 
connectionId) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, 
connectionId, FACTORY, descriptorProvider);
+    }
+
+    protected HandshakeManager createServerHandshakeManager() {

Review Comment:
   ```suggestion
       private HandshakeManager createServerHandshakeManager() {
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);
+                // No-op

Review Comment:
   I guess we can simply write:
   ```
   if (throwable != null) {
       clients.remove(address);
   }
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProvider {
+    // TODO: IGNITE-16954 Make this configurable
+    private static final int DEFAULT_QUEUE_LIMIT = 10;
+
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = 
new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, 
inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new 
RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
+    }
+
+    /** Channel key. */
+    private static class ChannelKey {
+        /** Remote node's consistent id. */
+        private final String consistentId;
+
+        /** Remote node's launch id. */
+        private final UUID launchId;
+
+        /**
+         * Connection id. Every connection between this node and a remote node 
has a unique connection id,
+         * but connections with different nodes may have same ids.

Review Comment:
   ```suggestion
            * but connections with different nodes may have same ids.
   ```
   ```suggestion
            * but connections with different nodes may have the same ids.
   ```



##########
modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import 
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import 
org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import 
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new 
TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new 
NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new 
TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = 
createRecoveryClientHandshakeManager(clientRecovery);

Review Comment:
   Please check `var` usages in this class, may of them are illegal



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -60,6 +63,7 @@ public class ConnectionManager {
     /** Server. */
     private final NettyServer server;
 
+    // TODO: IGNITE-16948 Should be a map consistentId -> connectionId -> 
sender

Review Comment:
   IGNITE-16948 does not describe why this should be a different map



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {

Review Comment:
   `connectionId` parameter is always 0, do I understand correctly that it will 
be changed under IGNITE-16948?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager 
{
+    /** Handshake completion future. */
+    protected final CompletableFuture<NettySender> handshakeCompleteFuture = 
new CompletableFuture<>();
+
+    /** Remote node's launch id. */
+    protected UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    protected String remoteConsistentId;
+
+    /** Connection id. */
+    protected short connectionId;
+
+    /** Netty pipeline channel handler context. */
+    protected ChannelHandlerContext ctx;
+
+    /** Channel. */
+    protected Channel channel;
+
+    /** Netty pipeline handshake handler. */
+    protected HandshakeHandler handler;
+
+    /** Recovery descriptor. */
+    protected RecoveryDescriptor recoveryDescriptor;
+
+    /** {@inheritDoc} */
+    @Override
+    public void onInit(ChannelHandlerContext handlerContext) {
+        this.ctx = handlerContext;
+        this.channel = handlerContext.channel();
+        this.handler = (HandshakeHandler) ctx.handler();
+    }
+
+    /**
+     * Creates a message handler using the consistent id of a remote node.
+     *
+     * @return New message handler.
+     */
+    public MessageHandler createMessageHandler() {
+        return handler.createMessageHandler(remoteConsistentId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NettySender> handshakeFuture() {
+        return handshakeCompleteFuture;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void onConnectionOpen() {

Review Comment:
   Why do you need to override this method here?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager 
{
+    /** Handshake completion future. */
+    protected final CompletableFuture<NettySender> handshakeCompleteFuture = 
new CompletableFuture<>();
+
+    /** Remote node's launch id. */
+    protected UUID remoteLaunchId;
+
+    /** Remote node's consistent id. */
+    protected String remoteConsistentId;
+
+    /** Connection id. */
+    protected short connectionId;

Review Comment:
   This field is not used in this class, I would suggest to encapsulate it in 
the child classes. Same applies to `recoveryDescriptor`



##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -28,29 +27,23 @@
  */
 public interface HandshakeManager {
     /**
-     * Initialize handshake manager with the channel.
+     * Initializes handshake manager with the channel.

Review Comment:
   ```suggestion
        * Initializes the handshake manager.
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -87,34 +83,75 @@ public HandshakeResult onConnectionOpen(Channel channel) {
                 );
             }
         });
-
-        return HandshakeResult.noOp();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = 
(HandshakeStartResponseMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
+            this.receivedCount = msg.receivedCount();
+            this.connectionId = msg.connectionId();
+
+            this.recoveryDescriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, 
remoteLaunchId,
+                    connectionId, true);
 
-            handshakeCompleteFuture.complete(new NettySender(channel, 
remoteLaunchId.toString(), remoteConsistentId));
+            handshake(recoveryDescriptor);
 
-            return HandshakeResult.removeHandler(remoteLaunchId, 
remoteConsistentId);
+            return;
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " 
+ message.toString())
-        );
+        assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+        if (recoveryDescriptor.unacknowledgedCount() == 0) {
+            finishHandshake();
+        }
 
-        return HandshakeResult.fail();
+        ctx.fireChannelRead(message);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
-        return handshakeCompleteFuture;
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, 
createMessageHandler(), messageFactory);
+
+        HandshakeFinishMessage response = 
messageFactory.handshakeFinishMessage()
+                .receivedCount(descriptor.receivedCount())
+                .build();
+
+        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+                ctx.channel().writeAndFlush(new OutNetworkObject(response, 
Collections.emptyList(), false))
+        );
+
+        descriptor.acknowledge(receivedCount);
+
+        int unacknowledgedCount = (int) descriptor.unacknowledgedCount();
+
+        if (unacknowledgedCount > 0) {
+            var futs = new CompletableFuture[unacknowledgedCount + 1];
+            futs[0] = sendFuture;
+
+            List<OutNetworkObject> networkMessages = 
descriptor.unacknowledgedMessages();
+
+            for (int i = 0; i < networkMessages.size(); i++) {
+                OutNetworkObject networkMessage = networkMessages.get(i);
+                futs[i + 1] = 
NettyUtils.toCompletableFuture(ctx.channel().writeAndFlush(networkMessage));

Review Comment:
   Should we use `write` here and only `flush` at the end?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {
             if (throwable == null) {
-                channels.put(sender.consistentId(), sender);

Review Comment:
   Why did you remove this line?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -221,17 +226,17 @@ private void onNewIncomingChannel(NettySender channel) {
      * @param address Target address.
      * @return New netty client.
      */
-    private NettyClient connect(SocketAddress address) {
+    private NettyClient connect(SocketAddress address, short connectionId) {
         var client = new NettyClient(
                 address,
                 serializationService,
-                clientHandshakeManagerFactory.get(),
+                createClientHandshakeManager(connectionId),
                 this::onMessage
         );
 
-        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+        client.start(clientBootstrap).whenComplete((nodeInfo, throwable) -> {

Review Comment:
   `nodeInfo`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -284,6 +289,14 @@ public boolean isStopped() {
         return stopped.get();
     }
 
+    protected HandshakeManager createClientHandshakeManager(short 
connectionId) {

Review Comment:
   ```suggestion
       private HandshakeManager createClientHandshakeManager(short 
connectionId) {
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryServerHandshakeManager.java:
##########
@@ -87,34 +83,75 @@ public HandshakeResult onConnectionOpen(Channel channel) {
                 );
             }
         });
-
-        return HandshakeResult.noOp();
     }
 
     /** {@inheritDoc} */
     @Override
-    public HandshakeResult onMessage(Channel channel, NetworkMessage message) {
+    public void onMessage(NetworkMessage message) {
         if (message instanceof HandshakeStartResponseMessage) {
             HandshakeStartResponseMessage msg = 
(HandshakeStartResponseMessage) message;
 
-            UUID remoteLaunchId = msg.launchId();
-            String remoteConsistentId = msg.consistentId();
+            this.remoteLaunchId = msg.launchId();
+            this.remoteConsistentId = msg.consistentId();
+            this.receivedCount = msg.receivedCount();
+            this.connectionId = msg.connectionId();
+
+            this.recoveryDescriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(remoteConsistentId, 
remoteLaunchId,
+                    connectionId, true);
 
-            handshakeCompleteFuture.complete(new NettySender(channel, 
remoteLaunchId.toString(), remoteConsistentId));
+            handshake(recoveryDescriptor);
 
-            return HandshakeResult.removeHandler(remoteLaunchId, 
remoteConsistentId);
+            return;
         }
 
-        handshakeCompleteFuture.completeExceptionally(
-                new HandshakeException("Unexpected message during handshake: " 
+ message.toString())
-        );
+        assert recoveryDescriptor != null : "Wrong server handshake flow";
+
+        if (recoveryDescriptor.unacknowledgedCount() == 0) {
+            finishHandshake();
+        }
 
-        return HandshakeResult.fail();
+        ctx.fireChannelRead(message);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<NettySender> handshakeFuture() {
-        return handshakeCompleteFuture;
+    private void handshake(RecoveryDescriptor descriptor) {
+        PipelineUtils.afterHandshake(ctx.pipeline(), descriptor, 
createMessageHandler(), messageFactory);
+
+        HandshakeFinishMessage response = 
messageFactory.handshakeFinishMessage()
+                .receivedCount(descriptor.receivedCount())
+                .build();
+
+        CompletableFuture<Void> sendFuture = NettyUtils.toCompletableFuture(
+                ctx.channel().writeAndFlush(new OutNetworkObject(response, 
Collections.emptyList(), false))
+        );
+
+        descriptor.acknowledge(receivedCount);
+
+        int unacknowledgedCount = (int) descriptor.unacknowledgedCount();

Review Comment:
   why did you make `unacknowledgedCount` a `long` if you cast it to `int`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/BaseRecoveryHandshakeManager.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.netty.HandshakeHandler;
+import org.apache.ignite.internal.network.netty.MessageHandler;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Base recovery handshake manager.
+ */
+public abstract class BaseRecoveryHandshakeManager implements HandshakeManager 
{

Review Comment:
   I actually find this class very confusing and the inheritors logic is very 
difficult to understand. Is it possible to get rid of it even at the cost of 
some copy-paste?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+    /** Unacknowledged messages. */
+    private final ArrayDeque<OutNetworkObject> unacknowledgedMessages;

Review Comment:
   ```suggestion
       private final Queue<OutNetworkObject> unacknowledgedMessages;
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -183,7 +188,7 @@ public CompletableFuture<NettySender> channel(@Nullable 
String consistentId, Soc
         // or didn't perform the handhsake operaton, can be reused.

Review Comment:
   ```suggestion
           // or didn't perform the handshake operation, can be reused.
   ```



##########
modules/network/src/test/java/org/apache/ignite/internal/network/netty/RecoveryHandshakeTest.java:
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import 
org.apache.ignite.internal.network.recovery.RecoveryClientHandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import 
org.apache.ignite.internal.network.recovery.RecoveryServerHandshakeManager;
+import org.apache.ignite.internal.network.serialization.ClassDescriptorFactory;
+import 
org.apache.ignite.internal.network.serialization.ClassDescriptorRegistry;
+import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+import org.apache.ignite.internal.network.serialization.SerializationService;
+import 
org.apache.ignite.internal.network.serialization.UserObjectSerializationContext;
+import 
org.apache.ignite.internal.network.serialization.marshal.DefaultUserObjectMarshaller;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+import org.apache.ignite.network.TestMessage;
+import org.apache.ignite.network.TestMessageSerializationRegistryImpl;
+import org.apache.ignite.network.TestMessagesFactory;
+import org.apache.ignite.network.serialization.MessageSerializationRegistry;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Recovery protocol handshake flow test.
+ */
+public class RecoveryHandshakeTest {
+    /** Connection id. */
+    private static final short CONNECTION_ID = 1337;
+
+    /** Serialization registry. */
+    private static final MessageSerializationRegistry MESSAGE_REGISTRY = new 
TestMessageSerializationRegistryImpl();
+
+    /** Message factory. */
+    private static final NetworkMessagesFactory MESSAGE_FACTORY = new 
NetworkMessagesFactory();
+
+    /** Test message factory. */
+    private static final TestMessagesFactory TEST_MESSAGES_FACTORY = new 
TestMessagesFactory();
+
+    @Test
+    public void testHandshake() throws Exception {
+        RecoveryDescriptorProvider clientRecovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = 
createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = 
createRecoveryServerHandshakeManager(serverRecovery);
+
+        EmbeddedChannel clientSideChannel = 
setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = 
setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+        assertNull(serverSideChannel.readOutbound());
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedServerMessage() throws 
Exception {
+        RecoveryDescriptorProvider clientRecovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
+
+        UUID clientLaunchId = UUID.randomUUID();
+        RecoveryDescriptor serverRecoveryDescriptor = 
serverRecovery.getRecoveryDescriptor("client", clientLaunchId, CONNECTION_ID, 
true);
+        addUnacknowledgedMessages(serverRecoveryDescriptor);
+
+        var clientHandshakeManager = 
createRecoveryClientHandshakeManager("client", clientLaunchId, clientRecovery);
+        var serverHandshakeManager = 
createRecoveryServerHandshakeManager(serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = 
setupChannel(clientHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        EmbeddedChannel serverSideChannel = 
setupChannel(serverHandshakeManager, noMessageListener);
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(clientSideChannel.readOutbound());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        assertNull(serverSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeNotCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testHandshakeWithUnacknowledgedClientMessage() throws 
Exception {
+        RecoveryDescriptorProvider clientRecovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
+
+        UUID serverLaunchId = UUID.randomUUID();
+        RecoveryDescriptor clientRecoveryDescriptor = 
clientRecovery.getRecoveryDescriptor("server", serverLaunchId, CONNECTION_ID, 
false);
+        addUnacknowledgedMessages(clientRecoveryDescriptor);
+
+        var clientHandshakeManager = 
createRecoveryClientHandshakeManager(clientRecovery);
+        var serverHandshakeManager = 
createRecoveryServerHandshakeManager("server", serverLaunchId, serverRecovery);
+
+        var messageCaptor = new AtomicReference<TestMessage>();
+        EmbeddedChannel clientSideChannel = 
setupChannel(clientHandshakeManager, noMessageListener);
+
+        EmbeddedChannel serverSideChannel = 
setupChannel(serverHandshakeManager, (inObject) -> {
+            NetworkMessage msg = inObject.message();
+
+            assertInstanceOf(TestMessage.class, msg);
+
+            messageCaptor.set((TestMessage) msg);
+        });
+
+        assertTrue(serverSideChannel.isActive());
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        assertNull(serverSideChannel.readOutbound());
+
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        assertNull(clientSideChannel.readOutbound());
+
+        TestMessage ackedMessage = messageCaptor.get();
+        assertNotNull(ackedMessage);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeNotCompleted(clientHandshakeManager);
+
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        checkHandshakeCompleted(serverHandshakeManager);
+        checkHandshakeCompleted(clientHandshakeManager);
+
+        checkPipelineAfterHandshake(serverSideChannel);
+        checkPipelineAfterHandshake(clientSideChannel);
+    }
+
+    @Test
+    public void testPairedRecoveryDescriptors() throws Exception {
+        RecoveryDescriptorProvider node1Recovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider node2Recovery = 
createRecoveryDescriptorProvider();
+
+        var node1Uuid = UUID.randomUUID();
+        var node2Uuid = UUID.randomUUID();
+
+        var chm1 = createRecoveryClientHandshakeManager("client", node1Uuid, 
node1Recovery);
+        var shm1 = createRecoveryServerHandshakeManager("client", node1Uuid, 
node1Recovery);
+
+        var chm2 = createRecoveryClientHandshakeManager("server", node2Uuid, 
node2Recovery);
+        var shm2 = createRecoveryServerHandshakeManager("server", node2Uuid, 
node2Recovery);
+
+        EmbeddedChannel out1to2 = setupChannel(chm1, noMessageListener);
+        EmbeddedChannel in1to2 = setupChannel(shm1, noMessageListener);
+        EmbeddedChannel out2to1 = setupChannel(chm2, noMessageListener);
+        EmbeddedChannel in2to1 = setupChannel(shm2, noMessageListener);
+
+        exchangeServerToClient(in1to2, out2to1);
+        exchangeServerToClient(in2to1, out1to2);
+
+        exchangeClientToServer(in1to2, out2to1);
+        exchangeClientToServer(in2to1, out1to2);
+
+        assertNotSame(chm1.recoveryDescriptor(), shm1.recoveryDescriptor());
+        assertNotSame(chm2.recoveryDescriptor(), shm2.recoveryDescriptor());
+    }
+
+    @Test
+    public void testExactlyOnceServer() throws Exception {
+        testExactlyOnce(true);
+    }
+
+    @Test
+    public void testExactlyOnceClient() throws Exception {
+        testExactlyOnce(false);
+    }
+
+    /**
+     * Tests that message was received exactly once in case if network failure 
during acknowledgement.
+     *
+     * @param serverDidntReceiveAck {@code true} if server didn't receive the 
acknowledgement, {@code false} if client didn't receive
+     *                              the acknowledgement.
+     * @throws Exception If failed.
+     */
+    private void testExactlyOnce(boolean serverDidntReceiveAck) throws 
Exception {
+        var server = "server";
+        var serverLaunchId = UUID.randomUUID();
+        var client = "client";
+        var clientLaunchId = UUID.randomUUID();
+
+        RecoveryDescriptorProvider clientRecovery = 
createRecoveryDescriptorProvider();
+        RecoveryDescriptorProvider serverRecovery = 
createRecoveryDescriptorProvider();
+
+        var clientHandshakeManager = 
createRecoveryClientHandshakeManager(client, clientLaunchId, clientRecovery);
+        var serverHandshakeManager = 
createRecoveryServerHandshakeManager(server, serverLaunchId, serverRecovery);
+
+        var receivedFirst = new AtomicBoolean();
+
+        var listener1 = new MessageListener("1", receivedFirst);
+
+        EmbeddedChannel clientSideChannel = 
setupChannel(clientHandshakeManager, serverDidntReceiveAck ? listener1 : 
noMessageListener);
+        EmbeddedChannel serverSideChannel = 
setupChannel(serverHandshakeManager, serverDidntReceiveAck ?  noMessageListener 
: listener1);
+
+        // Normal handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        var ch = serverDidntReceiveAck ? serverSideChannel : clientSideChannel;
+
+        // Add two messages to the outbound
+        ch.writeOutbound(new 
OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("1").build(), 
Collections.emptyList()));
+        ch.writeOutbound(new 
OutNetworkObject(TEST_MESSAGES_FACTORY.testMessage().msg("2").build(), 
Collections.emptyList()));
+
+        // Send one of the messages
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Message should be received
+        assertTrue(receivedFirst.get());
+
+        // Transfer only one acknowledgement, don't transfer the second one 
(simulates network failure on acknowledgement)
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        // Simulate reconnection
+        clientHandshakeManager = createRecoveryClientHandshakeManager(client, 
clientLaunchId, clientRecovery);
+        serverHandshakeManager = createRecoveryServerHandshakeManager(server, 
serverLaunchId, serverRecovery);
+
+        var receivedSecond = new AtomicBoolean();
+
+        var listener2 = new MessageListener("2", receivedSecond);
+
+        clientSideChannel = setupChannel(clientHandshakeManager, 
serverDidntReceiveAck ? listener2 : noMessageListener);
+        serverSideChannel = setupChannel(serverHandshakeManager, 
serverDidntReceiveAck ? noMessageListener : listener2);
+
+        // Handshake
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+        exchangeClientToServer(serverSideChannel, clientSideChannel);
+        exchangeServerToClient(serverSideChannel, clientSideChannel);
+
+        // Resending message
+        if (serverDidntReceiveAck) {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        }
+
+        // Send another acknowledgement
+        if (serverDidntReceiveAck) {
+            exchangeClientToServer(serverSideChannel, clientSideChannel);
+        } else {
+            exchangeServerToClient(serverSideChannel, clientSideChannel);
+        }
+
+        assertNull(serverSideChannel.readOutbound());
+        assertNull(clientSideChannel.readOutbound());
+
+        assertTrue(receivedSecond.get());
+    }
+
+    /** Message listener that accepts a specific message only once. */
+    private static class MessageListener implements Consumer<InNetworkObject> {
+        /** Expected message. */
+        private final String expectedMessage;
+
+        /** Flag indicating that expected messages was received. */
+        private final AtomicBoolean flag;
+
+        private MessageListener(String expectedMessage, AtomicBoolean flag) {
+            this.expectedMessage = expectedMessage;
+            this.flag = flag;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void accept(InNetworkObject inNetworkObject) {
+            var msg = (TestMessage) inNetworkObject.message();
+            if (expectedMessage.equals(msg.msg())) {
+                if (!flag.compareAndSet(false, true)) {
+                    fail();
+                }
+                return;
+            }
+            fail();
+        }
+    }
+
+    private void checkPipelineAfterHandshake(EmbeddedChannel channel) {
+        assertNull(channel.pipeline().get(HandshakeHandler.NAME));
+    }
+
+    private void checkHandshakeNotCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = 
manager.handshakeFuture();
+        assertFalse(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void checkHandshakeCompleted(HandshakeManager manager) {
+        CompletableFuture<NettySender> handshakeFuture = 
manager.handshakeFuture();
+        assertTrue(handshakeFuture.isDone());
+        assertFalse(handshakeFuture.isCompletedExceptionally());
+        assertFalse(handshakeFuture.isCancelled());
+    }
+
+    private void addUnacknowledgedMessages(RecoveryDescriptor 
recoveryDescriptor) {
+        TestMessage msg = 
TEST_MESSAGES_FACTORY.testMessage().msg("test").build();
+        recoveryDescriptor.add(new OutNetworkObject(msg, 
Collections.emptyList()));
+    }
+
+    private void exchangeServerToClient(EmbeddedChannel serverSideChannel, 
EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = serverSideChannel.readOutbound();
+        clientSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private void exchangeClientToServer(EmbeddedChannel serverSideChannel, 
EmbeddedChannel clientSideChannel) {
+        ByteBuf handshakeStartMessage = clientSideChannel.readOutbound();
+        serverSideChannel.writeInbound(handshakeStartMessage);
+    }
+
+    private final Consumer<InNetworkObject> noMessageListener = 
inNetworkObject ->
+            fail("Received message while shouldn't have, [" + 
inNetworkObject.message() + "]");
+
+    private EmbeddedChannel setupChannel(HandshakeManager handshakeManager, 
Consumer<InNetworkObject> messageListener) throws Exception {
+        // Channel should not be registered at first, not before we add 
pipeline handlers
+        // Otherwise, events like "channel active" won't be propagated to the 
handlers
+        var channel = new EmbeddedChannel(false, false);
+
+        var serializationService = new SerializationService(MESSAGE_REGISTRY, 
createUserObjectSerializationContext());
+        var sessionSerializationService = new 
PerSessionSerializationService(serializationService);
+
+        PipelineUtils.setup(channel.pipeline(), sessionSerializationService, 
handshakeManager, messageListener);
+
+        channel.register();
+
+        return channel;
+    }
+
+    private UserObjectSerializationContext 
createUserObjectSerializationContext() {
+        var userObjectDescriptorRegistry = new ClassDescriptorRegistry();
+        var userObjectDescriptorFactory = new 
ClassDescriptorFactory(userObjectDescriptorRegistry);
+
+        var userObjectMarshaller = new 
DefaultUserObjectMarshaller(userObjectDescriptorRegistry, 
userObjectDescriptorFactory);
+
+        return new 
UserObjectSerializationContext(userObjectDescriptorRegistry, 
userObjectDescriptorFactory,
+                userObjectMarshaller);
+    }
+
+    private RecoveryClientHandshakeManager 
createRecoveryClientHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryClientHandshakeManager("client", 
UUID.randomUUID(), provider);
+    }
+
+    private RecoveryClientHandshakeManager 
createRecoveryClientHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryClientHandshakeManager(launchId, consistentId, 
CONNECTION_ID, MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryServerHandshakeManager 
createRecoveryServerHandshakeManager(RecoveryDescriptorProvider provider) {
+        return createRecoveryServerHandshakeManager("server", 
UUID.randomUUID(), provider);
+    }
+
+    private RecoveryServerHandshakeManager 
createRecoveryServerHandshakeManager(String consistentId, UUID launchId,
+            RecoveryDescriptorProvider provider) {
+        return new RecoveryServerHandshakeManager(launchId, consistentId, 
MESSAGE_FACTORY, provider);
+    }
+
+    private RecoveryDescriptorProvider createRecoveryDescriptorProvider() {

Review Comment:
   What's the point of this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to