sashapolo commented on code in PR #3009: URL: https://github.com/apache/ignite-3/pull/3009#discussion_r1445828142
########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.network.netty; + +import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; + +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage; +import org.apache.ignite.network.OutNetworkObject; + +/** + * {@link io.netty.channel.ChannelOutboundHandler} that drops outgoing {@link AcknowledgementMessage}s. + */ +@Sharable +public class OutgoingAcknowledgementSilencer extends ChannelOutboundHandlerAdapter { + /** Name of this handler. */ + static final String NAME = "acknowledgement-silencer"; + + private volatile boolean silenceAcks = true; + + private final AtomicInteger addedCount = new AtomicInteger(); Review Comment: I would suggest to use a `Phaser` instead ########## modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java: ########## @@ -434,6 +434,9 @@ public static class Network { /** Port is in use. */ public static final int PORT_IN_USE_ERR = NETWORK_ERR_GROUP.registerErrorCode((short) 2); + + /** Recipient node has left the physical topology. */ + public static final int RECIPIENT_LEFT_ERR = NETWORK_ERR_GROUP.registerErrorCode((short) 5); Review Comment: Why 5? ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -246,8 +249,8 @@ public void testCanReconnectAfterFail() throws Exception { assertThrows(ClosedChannelException.class, () -> { try { - finalSender.send(new OutNetworkObject(testMessage, Collections.emptyList())).get(3, TimeUnit.SECONDS); - } catch (Exception e) { + finalSender.send(new OutNetworkObject(testMessage, emptyList())).get(3, TimeUnit.SECONDS); + } catch (ExecutionException e) { Review Comment: I believe there exists `assertThrowsWithCause` or something ########## modules/arch-test/src/test/java/org/apache/ignite/internal/IgniteExceptionArchTest.java: ########## @@ -89,6 +90,7 @@ public void check(JavaClass javaClass, ConditionEvents conditionEvents) { exclusions.add(NoRowSetExpectedException.class.getCanonicalName()); exclusions.add(InvalidCredentialsException.class.getCanonicalName()); exclusions.add(UnsupportedAuthenticationTypeException.class.getCanonicalName()); + exclusions.add(RecipientLeftException.class.getCanonicalName()); Review Comment: Not related to this PR, but can this initialization block be replaced with `Set.of`? ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -115,6 +128,8 @@ public class ConnectionManager implements ChannelCreationListener { /** Network Configuration. */ private final NetworkView networkConfiguration; + private final ExecutorService connectionMaintenanceExecutor; Review Comment: Please add a javadoc ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -388,20 +432,38 @@ public void stop() { return; } - Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat( + assert stopping.get(); + + Stream<CompletableFuture<Void>> stopFutures = Stream.concat( + Stream.concat( clients.values().stream().map(NettyClient::stop), Stream.of(server.stop()) ), channels.values().stream().map(NettySender::closeAsync) ); + stopFutures = Stream.concat(stopFutures, Stream.of(disposeDescriptors())); - CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new)); + CompletableFuture<Void> finalStopFuture = allOf(stopFutures.toArray(CompletableFuture<?>[]::new)); try { - stopFut.join(); + finalStopFuture.join(); } catch (Exception e) { LOG.warn("Failed to stop connection manager [reason={}]", e.getMessage()); } + + IgniteUtils.shutdownAndAwaitTermination(connectionMaintenanceExecutor, 10, TimeUnit.SECONDS); + } + + private CompletableFuture<Void> disposeDescriptors() { + Exception exceptionToFailSendFutures = new NodeStoppingException(); + + Collection<RecoveryDescriptor> descriptors = descriptorProvider.getAllRecoveryDescriptors(); + List<CompletableFuture<Void>> disposeFutures = new ArrayList<>(descriptors.size()); Review Comment: Can this be an array in the first place? ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -388,20 +432,38 @@ public void stop() { return; } - Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat( + assert stopping.get(); + + Stream<CompletableFuture<Void>> stopFutures = Stream.concat( Review Comment: Can we replace this stream with a list and simply add futures to it? Current stream usage starts to look a little bit messy ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java: ########## @@ -34,14 +39,26 @@ public class DefaultRecoveryDescriptorProvider implements RecoveryDescriptorProv /** Recovery descriptors. */ private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = new ConcurrentHashMap<>(); - /** {@inheritDoc} */ @Override public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID launchId, short connectionIndex) { var key = new ChannelKey(consistentId, launchId, connectionIndex); return recoveryDescriptors.computeIfAbsent(key, channelKey -> new RecoveryDescriptor(DEFAULT_QUEUE_LIMIT)); } + @Override + public Collection<RecoveryDescriptor> getRecoveryDescriptorsByLaunchId(UUID launchId) { + return recoveryDescriptors.entrySet().stream() + .filter(entry -> entry.getKey().launchId.equals(launchId)) + .map(Entry::getValue) + .collect(toList()); + } + + @Override + public Collection<RecoveryDescriptor> getAllRecoveryDescriptors() { + return new ArrayList<>(recoveryDescriptors.values()); Review Comment: I thitnk that `List.copyOf` looks better here ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -500,19 +562,77 @@ public void initiateStopping() { /** * Closes physical connections with an Ignite node identified by the given ID (it's not consistentId, - * it's ID that gets regenerated at each node restart). + * it's ID that gets regenerated at each node restart) and recovery descriptors corresponding to it. * - * @param id ID of the node. + * @param id ID of the node (it must have already left the topology). */ - public void closeConnectionsWith(String id) { + public void handleNodeLeft(String id) { + // We rely on the fact that the node with the given ID has already left the physical topology. + assert staleIdDetector.isIdStale(id) : id + " is not stale yet"; + + // TODO: IGNITE-21207 - remove descriptors for good. + + connectionMaintenanceExecutor.submit( + () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> { + // Closing descriptors separately (as some of them might not have an operating channel attached, but they + // still might have unacknowledged messages/futures). + disposeRecoveryDescriptorsOfLeftNode(id); + }, connectionMaintenanceExecutor) + ); + } + + private CompletableFuture<Void> closeChannelsWith(String id) { List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = channels.entrySet().stream() .filter(entry -> entry.getValue().launchId().equals(id)) .collect(toList()); + List<CompletableFuture<Void>> closeFutures = new ArrayList<>(); Review Comment: Can this be an array? ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryClientHandshakeManager.java: ########## @@ -165,14 +165,14 @@ public void onInit(ChannelHandlerContext handlerContext) { /** {@inheritDoc} */ @Override public void onMessage(NetworkMessage message) { - if (message instanceof HandshakeStartMessage) { Review Comment: Why did you change the order here? ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java: ########## @@ -128,14 +161,13 @@ public long onReceive() { return receivedCount; } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(RecoveryDescriptor.class, this); } /** - * Release this descriptor. + * Releases this descriptor if it's acquired by the given channel, otherwise does nothing. Review Comment: ```suggestion * Releases this descriptor if it's been acquired by the given channel, otherwise does nothing. ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -180,6 +195,15 @@ public ConnectionManager( ); this.clientBootstrap = bootstrapFactory.createClientBootstrap(); + + connectionMaintenanceExecutor = new ThreadPoolExecutor( Review Comment: Please add a small comment about why we are not using a `singleThreadExecutor` here ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -500,19 +562,77 @@ public void initiateStopping() { /** * Closes physical connections with an Ignite node identified by the given ID (it's not consistentId, - * it's ID that gets regenerated at each node restart). + * it's ID that gets regenerated at each node restart) and recovery descriptors corresponding to it. * - * @param id ID of the node. + * @param id ID of the node (it must have already left the topology). */ - public void closeConnectionsWith(String id) { + public void handleNodeLeft(String id) { + // We rely on the fact that the node with the given ID has already left the physical topology. + assert staleIdDetector.isIdStale(id) : id + " is not stale yet"; + + // TODO: IGNITE-21207 - remove descriptors for good. + + connectionMaintenanceExecutor.submit( + () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> { + // Closing descriptors separately (as some of them might not have an operating channel attached, but they + // still might have unacknowledged messages/futures). + disposeRecoveryDescriptorsOfLeftNode(id); + }, connectionMaintenanceExecutor) + ); + } + + private CompletableFuture<Void> closeChannelsWith(String id) { List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = channels.entrySet().stream() .filter(entry -> entry.getValue().launchId().equals(id)) .collect(toList()); + List<CompletableFuture<Void>> closeFutures = new ArrayList<>(); for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) { - entry.getValue().close(); + closeFutures.add(entry.getValue().closeAsync()); channels.remove(entry.getKey()); } + + return allOf(closeFutures.toArray(CompletableFuture[]::new)); + } + + private void disposeRecoveryDescriptorsOfLeftNode(String id) { + Exception exceptionToFailSendFutures = new RecipientLeftException(); + + for (RecoveryDescriptor descriptor : descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(id))) { + blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures); + } + } + + private CompletableFuture<Void> blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception exceptionToFailSendFutures) { + while (!descriptor.block(exceptionToFailSendFutures)) { Review Comment: Let's rename `block` to `tryBlock`, I think it's a more suitable name ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -500,19 +562,77 @@ public void initiateStopping() { /** * Closes physical connections with an Ignite node identified by the given ID (it's not consistentId, - * it's ID that gets regenerated at each node restart). + * it's ID that gets regenerated at each node restart) and recovery descriptors corresponding to it. * - * @param id ID of the node. + * @param id ID of the node (it must have already left the topology). */ - public void closeConnectionsWith(String id) { + public void handleNodeLeft(String id) { + // We rely on the fact that the node with the given ID has already left the physical topology. + assert staleIdDetector.isIdStale(id) : id + " is not stale yet"; + + // TODO: IGNITE-21207 - remove descriptors for good. + + connectionMaintenanceExecutor.submit( + () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> { + // Closing descriptors separately (as some of them might not have an operating channel attached, but they + // still might have unacknowledged messages/futures). + disposeRecoveryDescriptorsOfLeftNode(id); + }, connectionMaintenanceExecutor) + ); + } + + private CompletableFuture<Void> closeChannelsWith(String id) { List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = channels.entrySet().stream() .filter(entry -> entry.getValue().launchId().equals(id)) .collect(toList()); + List<CompletableFuture<Void>> closeFutures = new ArrayList<>(); for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) { - entry.getValue().close(); + closeFutures.add(entry.getValue().closeAsync()); channels.remove(entry.getKey()); } + + return allOf(closeFutures.toArray(CompletableFuture[]::new)); + } + + private void disposeRecoveryDescriptorsOfLeftNode(String id) { + Exception exceptionToFailSendFutures = new RecipientLeftException(); + + for (RecoveryDescriptor descriptor : descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(id))) { + blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures); + } + } + + private CompletableFuture<Void> blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception exceptionToFailSendFutures) { + while (!descriptor.block(exceptionToFailSendFutures)) { + if (descriptor.isBlocked()) { + // Already blocked concurrently, nothing to do here, the one who blocked it will handle the disposal (or already did). + return nullCompletedFuture(); + } + + DescriptorAcquiry acquiry = descriptor.holder(); Review Comment: What does this situation mean? Please add a comment ########## modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java: ########## @@ -32,6 +34,25 @@ /** * Recovery protocol descriptor. + * + * <p>Main state the descriptor holds (unacked messages queue and counters) is not protected by synchronization primitives Review Comment: ```suggestion * <p>Main state held by a descriptor (unacked messages queue and counters) is not protected by synchronization primitives ``` ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java: ########## @@ -61,10 +60,16 @@ public NettySender(Channel channel, String launchId, String consistentId, short * Sends the message. * * @param obj Network message wrapper. - * @return Future of the send operation. + * @return Future of the send operation (that gets completed when the message gets acknowledged by the receiver). */ public CompletableFuture<Void> send(OutNetworkObject obj) { - return toCompletableFuture(channel.writeAndFlush(obj)); + CompletableFuture<Void> writeFuture = toCompletableFuture(channel.writeAndFlush(obj)); + + if (!obj.networkMessage().needAck()) { Review Comment: I would use a ternary operator here, but it doesn't really matter ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -500,19 +562,77 @@ public void initiateStopping() { /** * Closes physical connections with an Ignite node identified by the given ID (it's not consistentId, - * it's ID that gets regenerated at each node restart). + * it's ID that gets regenerated at each node restart) and recovery descriptors corresponding to it. * - * @param id ID of the node. + * @param id ID of the node (it must have already left the topology). */ - public void closeConnectionsWith(String id) { + public void handleNodeLeft(String id) { + // We rely on the fact that the node with the given ID has already left the physical topology. + assert staleIdDetector.isIdStale(id) : id + " is not stale yet"; + + // TODO: IGNITE-21207 - remove descriptors for good. + + connectionMaintenanceExecutor.submit( + () -> closeChannelsWith(id).whenCompleteAsync((res, ex) -> { + // Closing descriptors separately (as some of them might not have an operating channel attached, but they + // still might have unacknowledged messages/futures). + disposeRecoveryDescriptorsOfLeftNode(id); + }, connectionMaintenanceExecutor) + ); + } + + private CompletableFuture<Void> closeChannelsWith(String id) { List<Entry<ConnectorKey<String>, NettySender>> entriesToRemove = channels.entrySet().stream() .filter(entry -> entry.getValue().launchId().equals(id)) .collect(toList()); + List<CompletableFuture<Void>> closeFutures = new ArrayList<>(); for (Entry<ConnectorKey<String>, NettySender> entry : entriesToRemove) { - entry.getValue().close(); + closeFutures.add(entry.getValue().closeAsync()); channels.remove(entry.getKey()); } + + return allOf(closeFutures.toArray(CompletableFuture[]::new)); + } + + private void disposeRecoveryDescriptorsOfLeftNode(String id) { + Exception exceptionToFailSendFutures = new RecipientLeftException(); + + for (RecoveryDescriptor descriptor : descriptorProvider.getRecoveryDescriptorsByLaunchId(UUID.fromString(id))) { + blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures); + } + } + + private CompletableFuture<Void> blockAndDisposeDescriptor(RecoveryDescriptor descriptor, Exception exceptionToFailSendFutures) { + while (!descriptor.block(exceptionToFailSendFutures)) { + if (descriptor.isBlocked()) { Review Comment: This looks confusing to me. I would expect `desriptor.block` to return `true` if it was successfully blocked and `false` if it is already blocked by someone else, which makes this check redundant. But looks like this is not the case. Please either add some comments or update the API. ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -388,20 +432,38 @@ public void stop() { return; } - Stream<CompletableFuture<Void>> stream = Stream.concat(Stream.concat( + assert stopping.get(); + + Stream<CompletableFuture<Void>> stopFutures = Stream.concat( + Stream.concat( clients.values().stream().map(NettyClient::stop), Stream.of(server.stop()) ), channels.values().stream().map(NettySender::closeAsync) ); + stopFutures = Stream.concat(stopFutures, Stream.of(disposeDescriptors())); - CompletableFuture<Void> stopFut = CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new)); + CompletableFuture<Void> finalStopFuture = allOf(stopFutures.toArray(CompletableFuture<?>[]::new)); try { - stopFut.join(); + finalStopFuture.join(); } catch (Exception e) { LOG.warn("Failed to stop connection manager [reason={}]", e.getMessage()); } + + IgniteUtils.shutdownAndAwaitTermination(connectionMaintenanceExecutor, 10, TimeUnit.SECONDS); + } + + private CompletableFuture<Void> disposeDescriptors() { + Exception exceptionToFailSendFutures = new NodeStoppingException(); + + Collection<RecoveryDescriptor> descriptors = descriptorProvider.getAllRecoveryDescriptors(); + List<CompletableFuture<Void>> disposeFutures = new ArrayList<>(descriptors.size()); + for (RecoveryDescriptor descriptor : descriptors) { + disposeFutures.add(blockAndDisposeDescriptor(descriptor, exceptionToFailSendFutures)); Review Comment: Shall we do it in parallel? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
