sashapolo commented on code in PR #3009:
URL: https://github.com/apache/ignite-3/pull/3009#discussion_r1445828142


##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import 
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * {@link io.netty.channel.ChannelOutboundHandler} that drops outgoing {@link 
AcknowledgementMessage}s.
+ */
+@Sharable
+public class OutgoingAcknowledgementSilencer extends 
ChannelOutboundHandlerAdapter {
+    /** Name of this handler. */
+    static final String NAME = "acknowledgement-silencer";
+
+    private volatile boolean silenceAcks = true;
+
+    private final AtomicInteger addedCount = new AtomicInteger();

Review Comment:
   I would suggest to use a `Phaser` instead



##########
modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java:
##########
@@ -434,6 +434,9 @@ public static class Network {
 
         /** Port is in use. */
         public static final int PORT_IN_USE_ERR = 
NETWORK_ERR_GROUP.registerErrorCode((short) 2);
+
+        /** Recipient node has left the physical topology. */
+        public static final int RECIPIENT_LEFT_ERR = 
NETWORK_ERR_GROUP.registerErrorCode((short) 5);

Review Comment:
   Why 5?



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java:
##########
@@ -246,8 +249,8 @@ public void testCanReconnectAfterFail() throws Exception {
 
         assertThrows(ClosedChannelException.class, () -> {
             try {
-                finalSender.send(new OutNetworkObject(testMessage, 
Collections.emptyList())).get(3, TimeUnit.SECONDS);
-            } catch (Exception e) {
+                finalSender.send(new OutNetworkObject(testMessage, 
emptyList())).get(3, TimeUnit.SECONDS);
+            } catch (ExecutionException e) {

Review Comment:
   I believe there exists `assertThrowsWithCause` or something



##########
modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java:
##########
@@ -89,6 +90,7 @@ public void check(JavaClass javaClass, ConditionEvents 
conditionEvents) {
         exclusions.add(NoRowSetExpectedException.class.getCanonicalName());
         exclusions.add(InvalidCredentialsException.class.getCanonicalName());
         
exclusions.add(UnsupportedAuthenticationTypeException.class.getCanonicalName());
+        exclusions.add(RecipientLeftException.class.getCanonicalName());

Review Comment:
   Not related to this PR, but can this initialization block be replaced with 
`Set.of`?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -115,6 +128,8 @@ public class ConnectionManager implements 
ChannelCreationListener {
     /** Network Configuration. */
     private final NetworkView networkConfiguration;
 
+    private final ExecutorService connectionMaintenanceExecutor;

Review Comment:
   Please add a javadoc



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -388,20 +432,38 @@ public void stop() {
             return;
         }
 
-        Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat(
+        assert stopping.get();
+
+        Stream<CompletableFuture<Void>> stopFutures = Stream.concat(
+                Stream.concat(
                     clients.values().stream().map(NettyClient::stop),
                     Stream.of(server.stop())
                 ),
                 channels.values().stream().map(NettySender::closeAsync)
         );
+        stopFutures = Stream.concat(stopFutures, 
Stream.of(disposeDescriptors()));
 
-        CompletableFuture<Void> stopFut = 
CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
+        CompletableFuture<Void> finalStopFuture = 
allOf(stopFutures.toArray(CompletableFuture<?>[]::new));
 
         try {
-            stopFut.join();
+            finalStopFuture.join();
         } catch (Exception e) {
             LOG.warn("Failed to stop connection manager [reason={}]", 
e.getMessage());
         }
+
+        IgniteUtils.shutdownAndAwaitTermination(connectionMaintenanceExecutor, 
10, TimeUnit.SECONDS);
+    }
+
+    private CompletableFuture<Void> disposeDescriptors() {
+        Exception exceptionToFailSendFutures = new NodeStoppingException();
+
+        Collection<RecoveryDescriptor> descriptors = 
descriptorProvider.getAllRecoveryDescriptors();
+        List<CompletableFuture<Void>> disposeFutures = new 
ArrayList<>(descriptors.size());

Review Comment:
   Can this be an array in the first place?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -388,20 +432,38 @@ public void stop() {
             return;
         }
 
-        Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat(
+        assert stopping.get();
+
+        Stream<CompletableFuture<Void>> stopFutures = Stream.concat(

Review Comment:
   Can we replace this stream with a list and simply add futures to it? Current 
stream usage starts to look a little bit messy



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -34,14 +39,26 @@ public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProv
     /** Recovery descriptors. */
     private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = 
new ConcurrentHashMap<>();
 
-    /** {@inheritDoc} */
     @Override
     public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex) {
         var key = new ChannelKey(consistentId, launchId, connectionIndex);
 
         return recoveryDescriptors.computeIfAbsent(key, channelKey -> new 
RecoveryDescriptor(DEFAULT_QUEUE_LIMIT));
     }
 
+    @Override
+    public Collection<RecoveryDescriptor> 
getRecoveryDescriptorsByLaunchId(UUID launchId) {
+        return recoveryDescriptors.entrySet().stream()
+                .filter(entry -> entry.getKey().launchId.equals(launchId))
+                .map(Entry::getValue)
+                .collect(toList());
+    }
+
+    @Override
+    public Collection<RecoveryDescriptor> getAllRecoveryDescriptors() {
+        return new ArrayList<>(recoveryDescriptors.values());

Review Comment:
   I thitnk that `List.copyOf` looks better here



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -500,19 +562,77 @@ public void initiateStopping() {
 
     /**
      * Closes physical connections with an Ignite node identified by the given 
ID (it's not consistentId,
-     * it's ID that gets regenerated at each node restart).
+     * it's ID that gets regenerated at each node restart) and recovery 
descriptors corresponding to it.
      *
-     * @param id ID of the node.
+     * @param id ID of the node (it must have already left the topology).
      */
-    public void closeConnectionsWith(String id) {
+    public void handleNodeLeft(String id) {
+        // We rely on the fact that the node with the given ID has already 
left the physical topology.
+        assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
+
+        // TODO: IGNITE-21207 - remove descriptors for good.
+
+        connectionMaintenanceExecutor.submit(
+                () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> {
+                    // Closing descriptors separately (as some of them might 
not have an operating channel attached, but they
+                    // still might have unacknowledged messages/futures).
+                    disposeRecoveryDescriptorsOfLeftNode(id);
+                }, connectionMaintenanceExecutor)
+        );
+    }
+
+    private CompletableFuture<Void> closeChannelsWith(String id) {
         List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = 
channels.entrySet().stream()
                 .filter(entry -> entry.getValue().launchId().equals(id))
                 .collect(toList());
 
+        List<CompletableFuture<Void>> closeFutures = new ArrayList<>();

Review Comment:
   Can this be an array?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java:
##########
@@ -165,14 +165,14 @@ public void onInit(ChannelHandlerContext handlerContext) {
     /** {@inheritDoc} */
     @Override
     public void onMessage(NetworkMessage message) {
-        if (message instanceof HandshakeStartMessage) {

Review Comment:
   Why did you change the order here?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -128,14 +161,13 @@ public long onReceive() {
         return receivedCount;
     }
 
-    /** {@inheritDoc} */
     @Override
     public String toString() {
         return S.toString(RecoveryDescriptor.class, this);
     }
 
     /**
-     * Release this descriptor.
+     * Releases this descriptor if it's acquired by the given channel, 
otherwise does nothing.

Review Comment:
   ```suggestion
        * Releases this descriptor if it's been acquired by the given channel, 
otherwise does nothing.
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -180,6 +195,15 @@ public ConnectionManager(
         );
 
         this.clientBootstrap = bootstrapFactory.createClientBootstrap();
+
+        connectionMaintenanceExecutor = new ThreadPoolExecutor(

Review Comment:
   Please add a small comment about why we are not using a 
`singleThreadExecutor` here



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -500,19 +562,77 @@ public void initiateStopping() {
 
     /**
      * Closes physical connections with an Ignite node identified by the given 
ID (it's not consistentId,
-     * it's ID that gets regenerated at each node restart).
+     * it's ID that gets regenerated at each node restart) and recovery 
descriptors corresponding to it.
      *
-     * @param id ID of the node.
+     * @param id ID of the node (it must have already left the topology).
      */
-    public void closeConnectionsWith(String id) {
+    public void handleNodeLeft(String id) {
+        // We rely on the fact that the node with the given ID has already 
left the physical topology.
+        assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
+
+        // TODO: IGNITE-21207 - remove descriptors for good.
+
+        connectionMaintenanceExecutor.submit(
+                () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> {
+                    // Closing descriptors separately (as some of them might 
not have an operating channel attached, but they
+                    // still might have unacknowledged messages/futures).
+                    disposeRecoveryDescriptorsOfLeftNode(id);
+                }, connectionMaintenanceExecutor)
+        );
+    }
+
+    private CompletableFuture<Void> closeChannelsWith(String id) {
         List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = 
channels.entrySet().stream()
                 .filter(entry -> entry.getValue().launchId().equals(id))
                 .collect(toList());
 
+        List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
         for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) 
{
-            entry.getValue().close();
+            closeFutures.add(entry.getValue().closeAsync());
 
             channels.remove(entry.getKey());
         }
+
+        return allOf(closeFutures.toArray(CompletableFuture[]::new));
+    }
+
+    private void disposeRecoveryDescriptorsOfLeftNode(String id) {
+        Exception exceptionToFailSendFutures = new RecipientLeftException();
+
+        for (RecoveryDescriptor descriptor : 
descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(id))) {
+            blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures);
+        }
+    }
+
+    private CompletableFuture<Void> 
blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception 
exceptionToFailSendFutures) {
+        while (!descriptor.block(exceptionToFailSendFutures)) {

Review Comment:
   Let's rename `block` to `tryBlock`, I think it's a more suitable name



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -500,19 +562,77 @@ public void initiateStopping() {
 
     /**
      * Closes physical connections with an Ignite node identified by the given 
ID (it's not consistentId,
-     * it's ID that gets regenerated at each node restart).
+     * it's ID that gets regenerated at each node restart) and recovery 
descriptors corresponding to it.
      *
-     * @param id ID of the node.
+     * @param id ID of the node (it must have already left the topology).
      */
-    public void closeConnectionsWith(String id) {
+    public void handleNodeLeft(String id) {
+        // We rely on the fact that the node with the given ID has already 
left the physical topology.
+        assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
+
+        // TODO: IGNITE-21207 - remove descriptors for good.
+
+        connectionMaintenanceExecutor.submit(
+                () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> {
+                    // Closing descriptors separately (as some of them might 
not have an operating channel attached, but they
+                    // still might have unacknowledged messages/futures).
+                    disposeRecoveryDescriptorsOfLeftNode(id);
+                }, connectionMaintenanceExecutor)
+        );
+    }
+
+    private CompletableFuture<Void> closeChannelsWith(String id) {
         List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = 
channels.entrySet().stream()
                 .filter(entry -> entry.getValue().launchId().equals(id))
                 .collect(toList());
 
+        List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
         for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) 
{
-            entry.getValue().close();
+            closeFutures.add(entry.getValue().closeAsync());
 
             channels.remove(entry.getKey());
         }
+
+        return allOf(closeFutures.toArray(CompletableFuture[]::new));
+    }
+
+    private void disposeRecoveryDescriptorsOfLeftNode(String id) {
+        Exception exceptionToFailSendFutures = new RecipientLeftException();
+
+        for (RecoveryDescriptor descriptor : 
descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(id))) {
+            blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures);
+        }
+    }
+
+    private CompletableFuture<Void> 
blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception 
exceptionToFailSendFutures) {
+        while (!descriptor.block(exceptionToFailSendFutures)) {
+            if (descriptor.isBlocked()) {
+                // Already blocked concurrently, nothing to do here, the one 
who blocked it will handle the disposal (or already did).
+                return nullCompletedFuture();
+            }
+
+            DescriptorAcquiry acquiry = descriptor.holder();

Review Comment:
   What does this situation mean? Please add a comment



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -32,6 +34,25 @@
 
 /**
  * Recovery protocol descriptor.
+ *
+ * <p>Main state the descriptor holds (unacked messages queue and counters) is 
not protected by synchronization primitives

Review Comment:
   ```suggestion
    * <p>Main state held by a descriptor (unacked messages queue and counters) 
is not protected by synchronization primitives
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java:
##########
@@ -61,10 +60,16 @@ public NettySender(Channel channel, String launchId, String 
consistentId, short
      * Sends the message.
      *
      * @param obj Network message wrapper.
-     * @return Future of the send operation.
+     * @return Future of the send operation (that gets completed when the 
message gets acknowledged by the receiver).
      */
     public CompletableFuture<Void> send(OutNetworkObject obj) {
-        return toCompletableFuture(channel.writeAndFlush(obj));
+        CompletableFuture<Void> writeFuture = 
toCompletableFuture(channel.writeAndFlush(obj));
+
+        if (!obj.networkMessage().needAck()) {

Review Comment:
   I would use a ternary operator here, but it doesn't really matter



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -500,19 +562,77 @@ public void initiateStopping() {
 
     /**
      * Closes physical connections with an Ignite node identified by the given 
ID (it's not consistentId,
-     * it's ID that gets regenerated at each node restart).
+     * it's ID that gets regenerated at each node restart) and recovery 
descriptors corresponding to it.
      *
-     * @param id ID of the node.
+     * @param id ID of the node (it must have already left the topology).
      */
-    public void closeConnectionsWith(String id) {
+    public void handleNodeLeft(String id) {
+        // We rely on the fact that the node with the given ID has already 
left the physical topology.
+        assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
+
+        // TODO: IGNITE-21207 - remove descriptors for good.
+
+        connectionMaintenanceExecutor.submit(
+                () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> {
+                    // Closing descriptors separately (as some of them might 
not have an operating channel attached, but they
+                    // still might have unacknowledged messages/futures).
+                    disposeRecoveryDescriptorsOfLeftNode(id);
+                }, connectionMaintenanceExecutor)
+        );
+    }
+
+    private CompletableFuture<Void> closeChannelsWith(String id) {
         List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = 
channels.entrySet().stream()
                 .filter(entry -> entry.getValue().launchId().equals(id))
                 .collect(toList());
 
+        List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
         for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) 
{
-            entry.getValue().close();
+            closeFutures.add(entry.getValue().closeAsync());
 
             channels.remove(entry.getKey());
         }
+
+        return allOf(closeFutures.toArray(CompletableFuture[]::new));
+    }
+
+    private void disposeRecoveryDescriptorsOfLeftNode(String id) {
+        Exception exceptionToFailSendFutures = new RecipientLeftException();
+
+        for (RecoveryDescriptor descriptor : 
descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(id))) {
+            blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures);
+        }
+    }
+
+    private CompletableFuture<Void> 
blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception 
exceptionToFailSendFutures) {
+        while (!descriptor.block(exceptionToFailSendFutures)) {
+            if (descriptor.isBlocked()) {

Review Comment:
   This looks confusing to me. I would expect `desriptor.block` to return 
`true` if it was successfully blocked and `false` if it is already blocked by 
someone else, which makes this check redundant. But looks like this is not the 
case. Please either add some comments or update the API.



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -388,20 +432,38 @@ public void stop() {
             return;
         }
 
-        Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat(
+        assert stopping.get();
+
+        Stream<CompletableFuture<Void>> stopFutures = Stream.concat(
+                Stream.concat(
                     clients.values().stream().map(NettyClient::stop),
                     Stream.of(server.stop())
                 ),
                 channels.values().stream().map(NettySender::closeAsync)
         );
+        stopFutures = Stream.concat(stopFutures, 
Stream.of(disposeDescriptors()));
 
-        CompletableFuture<Void> stopFut = 
CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
+        CompletableFuture<Void> finalStopFuture = 
allOf(stopFutures.toArray(CompletableFuture<?>[]::new));
 
         try {
-            stopFut.join();
+            finalStopFuture.join();
         } catch (Exception e) {
             LOG.warn("Failed to stop connection manager [reason={}]", 
e.getMessage());
         }
+
+        IgniteUtils.shutdownAndAwaitTermination(connectionMaintenanceExecutor, 
10, TimeUnit.SECONDS);
+    }
+
+    private CompletableFuture<Void> disposeDescriptors() {
+        Exception exceptionToFailSendFutures = new NodeStoppingException();
+
+        Collection<RecoveryDescriptor> descriptors = 
descriptorProvider.getAllRecoveryDescriptors();
+        List<CompletableFuture<Void>> disposeFutures = new 
ArrayList<>(descriptors.size());
+        for (RecoveryDescriptor descriptor : descriptors) {
+            disposeFutures.add(blockAndDisposeDescriptor(descriptor, 
exceptionToFailSendFutures));

Review Comment:
   Shall we do it in parallel? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to