sergey-chugunov-1985 commented on code in PR #2850:
URL: https://github.com/apache/ignite-3/pull/2850#discussion_r1401875043
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -74,7 +76,13 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
private final short connectionId;
/** Handshake completion future. */
- private final CompletableFuture<NettySender> handshakeCompleteFuture = new
CompletableFuture<>();
+ private final CompletableFuture<NettySender> localHandshakeCompleteFuture
= new CompletableFuture<>();
Review Comment:
In the case of ClientHandshakeManager we may call it an `outgoing` instead
of `local`.
##########
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeManager.java:
##########
@@ -48,9 +49,20 @@ default void onConnectionOpen() {
void onMessage(NetworkMessage message);
/**
- * Returns future that represents the handshake operation.
+ * Returns local future that represents the handshake operation. This is
the future that
+ * gets completed when the handshake itself terminates either successfully
or with an exception.
+ * This is used to complete the current handshake; to get the final
outcome of the connection attempt
+ * please use {@link #finalHandshakeFuture()}.
*
- * @return Future that represents the handshake operation.
+ * @return Local future that represents the handshake operation.
*/
- CompletableFuture<NettySender> handshakeFuture();
+ CompletableFuture<NettySender> localHandshakeFuture();
+
+ /**
+ * Returns final future that represents the handshake operation. This
represents completion of either
+ * current handshake or the inverse handshake if it wins (and the current
one loses).
+ *
+ * @return Final future that represents the handshake operation.
+ */
+ CompletionStage<NettySender> finalHandshakeFuture();
Review Comment:
How about different name - `globalHandshakeFuture`?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -237,70 +233,131 @@ private void
onHandshakeStartMessage(HandshakeStartMessage message) {
connectionId
);
- while (!descriptor.acquire(ctx)) {
- if (shouldCloseChannel(remoteLaunchId, launchId)) {
- Channel holderChannel = descriptor.holderChannel();
-
- if (holderChannel == null) {
- continue;
- }
+ while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+ // Don't use the tie-braking logic as this handshake attempt is
late: the competitor has already acquired
+ // recovery descriptors on both sides, so this handshake attempt
must fail regardless of the Tie Breaker's opinion.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to acquire recovery descriptor during
handshake, it is held by: {}.", descriptor.holderDescription());
+ }
- holderChannel.close().awaitUninterruptibly();
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Failed to acquire recovery descriptor during
handshake, it is held by: {}", descriptor.holderDescription());
- }
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
+ if (competitorAcquiry == null) {
+ continue;
+ }
- handshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+ // Complete our master future with the competitor's future. After
this our local future has no effect on the final result
+ // of this handshake.
+
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
- return;
- }
+ return;
}
this.recoveryDescriptor = descriptor;
handshake(this.recoveryDescriptor);
}
+ private void
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry
competitorAcquiry) {
+
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+ localHandshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Stepping aside to allow an incoming
handshake from " + remoteConsistentId + " finish.")
+ );
+ }
+
private void handleStaleServerId(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
+ String message = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(true)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
}
private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " tried to
establish a connection with " + consistentId
+ String message = msg.consistentId() + ":" + msg.launchId() + " tried
to establish a connection with " + consistentId
+ ", but it's stopping";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(false)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STOPPING.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
+ }
+
+ private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+ boolean ignorable = stopping.get() || !msg.reason().critical();
+
+ if (ignorable) {
+ LOG.debug("Handshake rejected by server: {}", msg.message());
Review Comment:
Maybe check for debug enabled?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -237,70 +233,131 @@ private void
onHandshakeStartMessage(HandshakeStartMessage message) {
connectionId
);
- while (!descriptor.acquire(ctx)) {
- if (shouldCloseChannel(remoteLaunchId, launchId)) {
- Channel holderChannel = descriptor.holderChannel();
-
- if (holderChannel == null) {
- continue;
- }
+ while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+ // Don't use the tie-braking logic as this handshake attempt is
late: the competitor has already acquired
+ // recovery descriptors on both sides, so this handshake attempt
must fail regardless of the Tie Breaker's opinion.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to acquire recovery descriptor during
handshake, it is held by: {}.", descriptor.holderDescription());
+ }
- holderChannel.close().awaitUninterruptibly();
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Failed to acquire recovery descriptor during
handshake, it is held by: {}", descriptor.holderDescription());
- }
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
+ if (competitorAcquiry == null) {
+ continue;
+ }
- handshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+ // Complete our master future with the competitor's future. After
this our local future has no effect on the final result
+ // of this handshake.
+
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
- return;
- }
+ return;
}
this.recoveryDescriptor = descriptor;
handshake(this.recoveryDescriptor);
}
+ private void
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry
competitorAcquiry) {
+
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+ localHandshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Stepping aside to allow an incoming
handshake from " + remoteConsistentId + " finish.")
Review Comment:
`to allow ... **to** finish` - I think we need an additional particle here
from grammar point of view.
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -74,7 +76,13 @@ public class RecoveryClientHandshakeManager implements
HandshakeManager {
private final short connectionId;
/** Handshake completion future. */
- private final CompletableFuture<NettySender> handshakeCompleteFuture = new
CompletableFuture<>();
+ private final CompletableFuture<NettySender> localHandshakeCompleteFuture
= new CompletableFuture<>();
+
+ /**
+ * Master future used to complete the handshake either with the results of
this handshake of the competing one
+ * (in the opposite direction), if it wins.
+ */
+ private final CompletableFuture<CompletionStage<NettySender>>
masterHandshakeCompleteFuture = new CompletableFuture<>();
Review Comment:
And here it could be `final`, `terminal` or `resulting` - WDYT?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -138,36 +140,49 @@ public String toString() {
* @param ctx Channel handler context.
*/
public void release(ChannelHandlerContext ctx) {
- channelHolder.compareAndSet(ctx.channel(), null);
+ DescriptorAcquiry oldAcquiry = channelHolder.getAndUpdate(acquiry -> {
+ if (acquiry != null && acquiry.channel() == ctx.channel()) {
+ return null;
+ }
+
+ return acquiry;
+ });
+
+ if (oldAcquiry != null && oldAcquiry.channel() == ctx.channel()) {
+ // We have successfully released the descriptor.
+ // Let's mark the clinch resolved just in case.
+ oldAcquiry.markClinchResolved();
Review Comment:
Can we check if the acquiry is in clinch state? AFAIK it should be a wrong
state for acquiry here, so this fact deserves to be logged for further
investigation.
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/message/HandshakeRejectionReason.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery.message;
+
+/**
+ * Reason for handshake rejection.
+ */
+public enum HandshakeRejectionReason {
+ /** The sender is stopping. */
+ STOPPING,
+ /**
+ * The sender has detected that the counterpart launch ID is stale (was
earlier used to establish a connection).
+ * After this is received it makes no sense to retry connections with same
node identity (launch ID must be changed
+ * to make a retry).
+ */
+ STALE_LAUNCH_ID,
+ /** The sender has detected a clinch and decided to terminate this
handshake in favor of the competitor. */
+ CLINCH;
+
+ /**
+ * Returns {@code true} iff the rejection is not expected and should be
treated as a critical failure (requiring
+ * the rejected node to restart).
+ */
+ public boolean critical() {
Review Comment:
How about renaming `critical` to something like `hazardous` to make it
clearer that we'd better send this node into the FailureHandler mouth?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -237,70 +233,131 @@ private void
onHandshakeStartMessage(HandshakeStartMessage message) {
connectionId
);
- while (!descriptor.acquire(ctx)) {
- if (shouldCloseChannel(remoteLaunchId, launchId)) {
- Channel holderChannel = descriptor.holderChannel();
-
- if (holderChannel == null) {
- continue;
- }
+ while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+ // Don't use the tie-braking logic as this handshake attempt is
late: the competitor has already acquired
+ // recovery descriptors on both sides, so this handshake attempt
must fail regardless of the Tie Breaker's opinion.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to acquire recovery descriptor during
handshake, it is held by: {}.", descriptor.holderDescription());
+ }
- holderChannel.close().awaitUninterruptibly();
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Failed to acquire recovery descriptor during
handshake, it is held by: {}", descriptor.holderDescription());
- }
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
+ if (competitorAcquiry == null) {
+ continue;
+ }
- handshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+ // Complete our master future with the competitor's future. After
this our local future has no effect on the final result
+ // of this handshake.
+
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
- return;
- }
+ return;
}
this.recoveryDescriptor = descriptor;
handshake(this.recoveryDescriptor);
}
+ private void
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry
competitorAcquiry) {
+
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+ localHandshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Stepping aside to allow an incoming
handshake from " + remoteConsistentId + " finish.")
+ );
+ }
+
private void handleStaleServerId(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
+ String message = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(true)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
}
private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " tried to
establish a connection with " + consistentId
+ String message = msg.consistentId() + ":" + msg.launchId() + " tried
to establish a connection with " + consistentId
+ ", but it's stopping";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(false)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STOPPING.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
+ }
+
+ private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+ boolean ignorable = stopping.get() || !msg.reason().critical();
+
+ if (ignorable) {
+ LOG.debug("Handshake rejected by server: {}", msg.message());
+ } else {
+ LOG.warn("Handshake rejected by server: {}", msg.message());
+ }
+
+ if (msg.reason() == HandshakeRejectionReason.CLINCH) {
+ giveUpClinch();
+ } else {
+ localHandshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.message()));
+ }
+
+ if (!ignorable) {
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake
rejected by server: " + msg.message()));
+ }
+ }
+
+ private void giveUpClinch() {
+ RecoveryDescriptor descriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
+ remoteConsistentId,
+ remoteLaunchId,
+ connectionId
+ );
+
+ DescriptorAcquiry myAcquiry = descriptor.holder();
+ assert myAcquiry != null;
Review Comment:
I maybe a bit paranoid here but I really would prefer to have some sort of
IDs on recovery descriptors. I cannot imaging a scenario when we get a
HandshakeRejectedMessage out of thin air but if we do we'll fail on these
asserts immediately.
Or these messages could be constructed maliciously e.g. to fail a node so
this code could be a security vulnerability.
What do you think about these ideas? However this is not a blocker for this
improvement right now.
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -237,70 +233,131 @@ private void
onHandshakeStartMessage(HandshakeStartMessage message) {
connectionId
);
- while (!descriptor.acquire(ctx)) {
- if (shouldCloseChannel(remoteLaunchId, launchId)) {
- Channel holderChannel = descriptor.holderChannel();
-
- if (holderChannel == null) {
- continue;
- }
+ while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+ // Don't use the tie-braking logic as this handshake attempt is
late: the competitor has already acquired
+ // recovery descriptors on both sides, so this handshake attempt
must fail regardless of the Tie Breaker's opinion.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to acquire recovery descriptor during
handshake, it is held by: {}.", descriptor.holderDescription());
+ }
- holderChannel.close().awaitUninterruptibly();
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Failed to acquire recovery descriptor during
handshake, it is held by: {}", descriptor.holderDescription());
- }
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
+ if (competitorAcquiry == null) {
+ continue;
+ }
- handshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+ // Complete our master future with the competitor's future. After
this our local future has no effect on the final result
+ // of this handshake.
+
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
- return;
- }
+ return;
}
this.recoveryDescriptor = descriptor;
handshake(this.recoveryDescriptor);
}
+ private void
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry
competitorAcquiry) {
+
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+ localHandshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Stepping aside to allow an incoming
handshake from " + remoteConsistentId + " finish.")
+ );
+ }
+
private void handleStaleServerId(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
+ String message = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(true)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
}
private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " tried to
establish a connection with " + consistentId
+ String message = msg.consistentId() + ":" + msg.launchId() + " tried
to establish a connection with " + consistentId
+ ", but it's stopping";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(false)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STOPPING.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
+ }
+
+ private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+ boolean ignorable = stopping.get() || !msg.reason().critical();
+
+ if (ignorable) {
+ LOG.debug("Handshake rejected by server: {}", msg.message());
+ } else {
+ LOG.warn("Handshake rejected by server: {}", msg.message());
+ }
+
+ if (msg.reason() == HandshakeRejectionReason.CLINCH) {
+ giveUpClinch();
+ } else {
+ localHandshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.message()));
+ }
+
+ if (!ignorable) {
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
Review Comment:
Why TODO? FH is called and should fail the node.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]