sashapolo commented on code in PR #3103:
URL: https://github.com/apache/ignite-3/pull/3103#discussion_r1469380024
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java:
##########
@@ -41,31 +48,131 @@ public class NettySender {
private final short channelId;
+ private final RecoveryDescriptor recoveryDescriptor;
+
/**
* Constructor.
*
* @param channel Netty channel.
* @param launchId Launch id of the remote node.
* @param consistentId Consistent id of the remote node.
* @param channelId channel identifier.
+ * @param recoveryDescriptor Descriptor corresponding to the current
logical connection.
*/
- public NettySender(Channel channel, String launchId, String consistentId,
short channelId) {
+ public NettySender(Channel channel, String launchId, String consistentId,
short channelId, RecoveryDescriptor recoveryDescriptor) {
this.channel = channel;
this.launchId = launchId;
this.consistentId = consistentId;
this.channelId = channelId;
+ this.recoveryDescriptor = recoveryDescriptor;
}
/**
* Sends the message.
*
+ * <p>NB: the future returned by this method might be completed by a
channel different from the channel encapsulated
+ * in the current sender (this might happen if the 'current' channel gets
closed and another one is opened
+ * in the same logical connection and resends a not-yet-acknowledged
message sent via the old channel).
+ *
* @param obj Network message wrapper.
* @return Future of the send operation (that gets completed when the
message gets acknowledged by the receiver).
*/
+ @TestOnly
public CompletableFuture<Void> send(OutNetworkObject obj) {
+ return send(obj, () -> {});
+ }
+
+ /**
+ * Sends the message.
+ *
+ * <p>NB: the future returned by this method might be completed by a
channel different from the channel encapsulated
+ * in the current sender (this might happen if the 'current' channel gets
closed and another one is opened
+ * in the same logical connection and resends a not-yet-acknowledged
message sent via the old channel).
+ *
+ * @param obj Network message wrapper.
+ * @param triggerChannelRecreation Used to trigger channel recreation
(when it turns out that the underlying channel is closed
+ * and the connection recovery procedure has to be performed).
+ * @return Future of the send operation (that gets completed when the
message gets acknowledged by the receiver).
+ */
+ public CompletableFuture<Void> send(OutNetworkObject obj, Runnable
triggerChannelRecreation) {
+ if (!obj.networkMessage().needAck()) {
+ // We don't care that the client might get an exception like
ClosedChannelException or that the message
+ // will be lost if the channel is closed as it does not require to
be acked.
+ return toCompletableFuture(channel.writeAndFlush(obj));
+ }
+
+ // Write in event loop to make sure that, if a ClosedSocketException
happens, we recover from it without existing the event loop.
Review Comment:
```suggestion
// Write in event loop to make sure that, if a ClosedSocketException
happens, we recover from it without exiting the event loop.
```
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java:
##########
@@ -41,31 +48,131 @@ public class NettySender {
private final short channelId;
+ private final RecoveryDescriptor recoveryDescriptor;
+
/**
* Constructor.
*
* @param channel Netty channel.
* @param launchId Launch id of the remote node.
* @param consistentId Consistent id of the remote node.
* @param channelId channel identifier.
+ * @param recoveryDescriptor Descriptor corresponding to the current
logical connection.
*/
- public NettySender(Channel channel, String launchId, String consistentId,
short channelId) {
+ public NettySender(Channel channel, String launchId, String consistentId,
short channelId, RecoveryDescriptor recoveryDescriptor) {
this.channel = channel;
this.launchId = launchId;
this.consistentId = consistentId;
this.channelId = channelId;
+ this.recoveryDescriptor = recoveryDescriptor;
}
/**
* Sends the message.
*
+ * <p>NB: the future returned by this method might be completed by a
channel different from the channel encapsulated
+ * in the current sender (this might happen if the 'current' channel gets
closed and another one is opened
+ * in the same logical connection and resends a not-yet-acknowledged
message sent via the old channel).
+ *
* @param obj Network message wrapper.
* @return Future of the send operation (that gets completed when the
message gets acknowledged by the receiver).
*/
+ @TestOnly
public CompletableFuture<Void> send(OutNetworkObject obj) {
+ return send(obj, () -> {});
+ }
+
+ /**
+ * Sends the message.
+ *
+ * <p>NB: the future returned by this method might be completed by a
channel different from the channel encapsulated
+ * in the current sender (this might happen if the 'current' channel gets
closed and another one is opened
+ * in the same logical connection and resends a not-yet-acknowledged
message sent via the old channel).
+ *
+ * @param obj Network message wrapper.
+ * @param triggerChannelRecreation Used to trigger channel recreation
(when it turns out that the underlying channel is closed
+ * and the connection recovery procedure has to be performed).
+ * @return Future of the send operation (that gets completed when the
message gets acknowledged by the receiver).
+ */
+ public CompletableFuture<Void> send(OutNetworkObject obj, Runnable
triggerChannelRecreation) {
+ if (!obj.networkMessage().needAck()) {
+ // We don't care that the client might get an exception like
ClosedChannelException or that the message
+ // will be lost if the channel is closed as it does not require to
be acked.
+ return toCompletableFuture(channel.writeAndFlush(obj));
+ }
+
+ // Write in event loop to make sure that, if a ClosedSocketException
happens, we recover from it without existing the event loop.
+ // We need this to avoid message reordering due to switching from old
channel to a new one.
+ if (channel.eventLoop().inEventLoop()) {
+ writeWithRecovery(obj, channel, triggerChannelRecreation);
+ } else {
+ channel.eventLoop().execute(() -> writeWithRecovery(obj, channel,
triggerChannelRecreation));
+ }
+
+ return obj.acknowledgedFuture();
+ }
+
+ private void chainRecoverSendAfterChannelClosure(
+ CompletableFuture<Void> writeFuture,
+ OutNetworkObject obj,
+ Channel currentChannel,
+ Runnable triggerChannelRecreation
+ ) {
+ if (!completedSuccessfully(writeFuture)) {
+ writeFuture.whenComplete((res, ex) -> {
+ if (ex instanceof ClosedChannelException) {
+ try {
+ recoverSendAfterChannelClosure(obj, currentChannel,
triggerChannelRecreation);
+ } catch (RuntimeException | AssertionError e) {
+ LOG.error("An error while sending a message {}", e,
obj.networkMessage());
+ }
+ }
+ });
+ }
+ }
+
+ private static boolean completedSuccessfully(CompletableFuture<Void>
writeFuture) {
+ return writeFuture.isDone() && !writeFuture.isCompletedExceptionally()
&& !writeFuture.isCancelled();
+ }
+
+ private void recoverSendAfterChannelClosure(OutNetworkObject obj, Channel
currentChannel, Runnable triggerChannelRecreation) {
+ assert NettyBootstrapFactory.isInNetworkThread() : "In a non-netty
thread " + Thread.currentThread();
+
+ // As we are in the channel event loop and all channels of the same
logical connection use the same event loop,
+ // we can be sure that nothing related to our channel, recovery
descriptor and possible new channel can change
+ // concurrently.
+
+ Channel holderChannel = recoveryDescriptor.holderChannel();
+
+ if (holderChannel == null || holderChannel == currentChannel) {
+ // Our channel is being closed or is already completely closed,
its pipeline might be (or not be) destroyed.
+ // But no new channel acquired the descriptor, so we are alone.
+
+ if (obj.shouldBeSavedForRecovery()) {
+ // It was not saved yet (as normally OutboundRecoveryHandler
adds it to the unacknowledged messages queue, but
Review Comment:
```suggestion
// It was not saved yet (as normally OutboundRecoveryHandler
adds it to the unacknowledged messages queue), but
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]