rpuch commented on code in PR #2850:
URL: https://github.com/apache/ignite-3/pull/2850#discussion_r1403251519


##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -237,70 +233,131 @@ private void 
onHandshakeStartMessage(HandshakeStartMessage message) {
                 connectionId
         );
 
-        while (!descriptor.acquire(ctx)) {
-            if (shouldCloseChannel(remoteLaunchId, launchId)) {
-                Channel holderChannel = descriptor.holderChannel();
-
-                if (holderChannel == null) {
-                    continue;
-                }
+        while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+            // Don't use the tie-braking logic as this handshake attempt is 
late: the competitor has already acquired
+            // recovery descriptors on both sides, so this handshake attempt 
must fail regardless of the Tie Breaker's opinion.
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Failed to acquire recovery descriptor during 
handshake, it is held by: {}.", descriptor.holderDescription());
+            }
 
-                holderChannel.close().awaitUninterruptibly();
-            } else {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("Failed to acquire recovery descriptor during 
handshake, it is held by: {}", descriptor.holderDescription());
-                }
+            DescriptorAcquiry competitorAcquiry = descriptor.holder();
+            if (competitorAcquiry == null) {
+                continue;
+            }
 
-                handshakeCompleteFuture.completeExceptionally(new 
ChannelAlreadyExistsException(remoteConsistentId));
+            // Complete our master future with the competitor's future. After 
this our local future has no effect on the final result
+            // of this handshake.
+            
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
 
-                return;
-            }
+            return;
         }
 
         this.recoveryDescriptor = descriptor;
 
         handshake(this.recoveryDescriptor);
     }
 
+    private void 
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry 
competitorAcquiry) {
+        
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+        localHandshakeCompleteFuture.completeExceptionally(
+                new HandshakeException("Stepping aside to allow an incoming 
handshake from " + remoteConsistentId + " finish.")
+        );
+    }
+
     private void handleStaleServerId(HandshakeStartMessage msg) {
-        String reason = msg.consistentId() + ":" + msg.launchId() + " is 
stale, server should be restarted so that clients can connect";
+        String message = msg.consistentId() + ":" + msg.launchId() + " is 
stale, server should be restarted so that clients can connect";
         HandshakeRejectedMessage rejectionMessage = 
MESSAGE_FACTORY.handshakeRejectedMessage()
-                .critical(true)
-                .reason(reason)
+                .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+                .message(message)
                 .build();
 
-        sendHandshakeRejectedMessage(rejectionMessage, reason);
+        sendHandshakeRejectedMessage(rejectionMessage, message);
     }
 
     private void 
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
-        String reason = msg.consistentId() + ":" + msg.launchId() + " tried to 
establish a connection with " + consistentId
+        String message = msg.consistentId() + ":" + msg.launchId() + " tried 
to establish a connection with " + consistentId
                 + ", but it's stopping";
         HandshakeRejectedMessage rejectionMessage = 
MESSAGE_FACTORY.handshakeRejectedMessage()
-                .critical(false)
-                .reason(reason)
+                .reasonString(HandshakeRejectionReason.STOPPING.name())
+                .message(message)
                 .build();
 
-        sendHandshakeRejectedMessage(rejectionMessage, reason);
+        sendHandshakeRejectedMessage(rejectionMessage, message);
+    }
+
+    private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+        boolean ignorable = stopping.get() || !msg.reason().critical();
+
+        if (ignorable) {
+            LOG.debug("Handshake rejected by server: {}", msg.message());
+        } else {
+            LOG.warn("Handshake rejected by server: {}", msg.message());
+        }
+
+        if (msg.reason() == HandshakeRejectionReason.CLINCH) {
+            giveUpClinch();
+        } else {
+            localHandshakeCompleteFuture.completeExceptionally(new 
HandshakeException(msg.message()));
+        }
+
+        if (!ignorable) {
+            // TODO: IGNITE-16899 Perhaps we need to fail the node by 
FailureHandler
+            failureHandler.handleFailure(new IgniteException("Handshake 
rejected by server: " + msg.message()));
+        }
+    }
+
+    private void giveUpClinch() {
+        RecoveryDescriptor descriptor = 
recoveryDescriptorProvider.getRecoveryDescriptor(
+                remoteConsistentId,
+                remoteLaunchId,
+                connectionId
+        );
+
+        DescriptorAcquiry myAcquiry = descriptor.holder();
+        assert myAcquiry != null;

Review Comment:
   We cannot do anything here 'easily', and the protocol seems to be designed 
around trust to the other side. If this has to be changed, we'll redesign the 
protocol, but I think this should be solved by other means (firewall and TLS 
auth)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to