rpuch commented on code in PR #2850:
URL: https://github.com/apache/ignite-3/pull/2850#discussion_r1403251519
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -237,70 +233,131 @@ private void
onHandshakeStartMessage(HandshakeStartMessage message) {
connectionId
);
- while (!descriptor.acquire(ctx)) {
- if (shouldCloseChannel(remoteLaunchId, launchId)) {
- Channel holderChannel = descriptor.holderChannel();
-
- if (holderChannel == null) {
- continue;
- }
+ while (!descriptor.acquire(ctx, localHandshakeCompleteFuture)) {
+ // Don't use the tie-braking logic as this handshake attempt is
late: the competitor has already acquired
+ // recovery descriptors on both sides, so this handshake attempt
must fail regardless of the Tie Breaker's opinion.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to acquire recovery descriptor during
handshake, it is held by: {}.", descriptor.holderDescription());
+ }
- holderChannel.close().awaitUninterruptibly();
- } else {
- if (LOG.isInfoEnabled()) {
- LOG.info("Failed to acquire recovery descriptor during
handshake, it is held by: {}", descriptor.holderDescription());
- }
+ DescriptorAcquiry competitorAcquiry = descriptor.holder();
+ if (competitorAcquiry == null) {
+ continue;
+ }
- handshakeCompleteFuture.completeExceptionally(new
ChannelAlreadyExistsException(remoteConsistentId));
+ // Complete our master future with the competitor's future. After
this our local future has no effect on the final result
+ // of this handshake.
+
completeMasterFutureWithCompetitorHandshakeFuture(competitorAcquiry);
- return;
- }
+ return;
}
this.recoveryDescriptor = descriptor;
handshake(this.recoveryDescriptor);
}
+ private void
completeMasterFutureWithCompetitorHandshakeFuture(DescriptorAcquiry
competitorAcquiry) {
+
masterHandshakeCompleteFuture.complete(competitorAcquiry.handshakeCompleteFuture());
+ localHandshakeCompleteFuture.completeExceptionally(
+ new HandshakeException("Stepping aside to allow an incoming
handshake from " + remoteConsistentId + " finish.")
+ );
+ }
+
private void handleStaleServerId(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
+ String message = msg.consistentId() + ":" + msg.launchId() + " is
stale, server should be restarted so that clients can connect";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(true)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STALE_LAUNCH_ID.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
}
private void
handleRefusalToEstablishConnectionDueToStopping(HandshakeStartMessage msg) {
- String reason = msg.consistentId() + ":" + msg.launchId() + " tried to
establish a connection with " + consistentId
+ String message = msg.consistentId() + ":" + msg.launchId() + " tried
to establish a connection with " + consistentId
+ ", but it's stopping";
HandshakeRejectedMessage rejectionMessage =
MESSAGE_FACTORY.handshakeRejectedMessage()
- .critical(false)
- .reason(reason)
+ .reasonString(HandshakeRejectionReason.STOPPING.name())
+ .message(message)
.build();
- sendHandshakeRejectedMessage(rejectionMessage, reason);
+ sendHandshakeRejectedMessage(rejectionMessage, message);
+ }
+
+ private void onHandshakeRejectedMessage(HandshakeRejectedMessage msg) {
+ boolean ignorable = stopping.get() || !msg.reason().critical();
+
+ if (ignorable) {
+ LOG.debug("Handshake rejected by server: {}", msg.message());
+ } else {
+ LOG.warn("Handshake rejected by server: {}", msg.message());
+ }
+
+ if (msg.reason() == HandshakeRejectionReason.CLINCH) {
+ giveUpClinch();
+ } else {
+ localHandshakeCompleteFuture.completeExceptionally(new
HandshakeException(msg.message()));
+ }
+
+ if (!ignorable) {
+ // TODO: IGNITE-16899 Perhaps we need to fail the node by
FailureHandler
+ failureHandler.handleFailure(new IgniteException("Handshake
rejected by server: " + msg.message()));
+ }
+ }
+
+ private void giveUpClinch() {
+ RecoveryDescriptor descriptor =
recoveryDescriptorProvider.getRecoveryDescriptor(
+ remoteConsistentId,
+ remoteLaunchId,
+ connectionId
+ );
+
+ DescriptorAcquiry myAcquiry = descriptor.holder();
+ assert myAcquiry != null;
Review Comment:
We cannot do anything here 'easily', and the protocol seems to be designed
around trust to the other side. If this has to be changed, we'll redesign the
protocol, but I think this should be solved by other means (firewall and TLS
auth)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]