rpuch commented on code in PR #3009:
URL: https://github.com/apache/ignite-3/pull/3009#discussion_r1446968066


##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -500,19 +562,77 @@ public void initiateStopping() {
 
     /**
      * Closes physical connections with an Ignite node identified by the given 
ID (it's not consistentId,
-     * it's ID that gets regenerated at each node restart).
+     * it's ID that gets regenerated at each node restart) and recovery 
descriptors corresponding to it.
      *
-     * @param id ID of the node.
+     * @param id ID of the node (it must have already left the topology).
      */
-    public void closeConnectionsWith(String id) {
+    public void handleNodeLeft(String id) {
+        // We rely on the fact that the node with the given ID has already 
left the physical topology.
+        assert staleIdDetector.isIdStale(id) : id + " is not stale yet";
+
+        // TODO: IGNITE-21207 - remove descriptors for good.
+
+        connectionMaintenanceExecutor.submit(
+                () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> {
+                    // Closing descriptors separately (as some of them might 
not have an operating channel attached, but they
+                    // still might have unacknowledged messages/futures).
+                    disposeRecoveryDescriptorsOfLeftNode(id);
+                }, connectionMaintenanceExecutor)
+        );
+    }
+
+    private CompletableFuture<Void> closeChannelsWith(String id) {
         List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = 
channels.entrySet().stream()
                 .filter(entry -> entry.getValue().launchId().equals(id))
                 .collect(toList());
 
+        List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
         for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) 
{
-            entry.getValue().close();
+            closeFutures.add(entry.getValue().closeAsync());
 
             channels.remove(entry.getKey());
         }
+
+        return allOf(closeFutures.toArray(CompletableFuture[]::new));
+    }
+
+    private void disposeRecoveryDescriptorsOfLeftNode(String id) {
+        Exception exceptionToFailSendFutures = new RecipientLeftException();
+
+        for (RecoveryDescriptor descriptor : 
descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(id))) {
+            blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures);
+        }
+    }
+
+    private CompletableFuture<Void> 
blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception 
exceptionToFailSendFutures) {
+        while (!descriptor.block(exceptionToFailSendFutures)) {
+            if (descriptor.isBlocked()) {

Review Comment:
   `block()` fails if the descriptor is *acquired*, which may mean that it's 
actually *blocked* or just *acquired* normally by a `Channel`. We might think 
about a descriptor having 3 possible states. I'm not sure a reader should just 
expect anything here without reading the javadocs.
   
   I still renamed the methods to include `Forever` in their names, I hope this 
eliminates the confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to