ibessonov commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871081859


##########
modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java:
##########
@@ -24,7 +24,7 @@
  */
 public interface NetworkMessage {
     /** Size of the message type (in bytes), used during (de-)serialization. */
-    static final int MSG_TYPE_SIZE_BYTES = 4;
+    int MSG_TYPE_SIZE_BYTES = 4;

Review Comment:
   Did we implement a varlen type compression? Absolute majority of messages 
only require 2 bytes



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -72,43 +76,41 @@ public class ConnectionManager {
     /** Message listeners. */
     private final List<Consumer<InNetworkObject>> listeners = new 
CopyOnWriteArrayList<>();
 
+    private final UUID launchId;

Review Comment:
   Please add a small javadoc



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProvider {
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = 
new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, 
inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new 
RecoveryDescriptor(10));

Review Comment:
   What is a "10"?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import 
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends 
acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, 
NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()

Review Comment:
   So, you're saying that every message that needs an ack will receive it 
individually, right?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.network.OutNetworkObject;
+
+/** Outbound handler that adds outgoing message to the recovery descriptor. */
+public class OutboundRecoveryHandler extends ChannelOutboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "outbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     */
+    public OutboundRecoveryHandler(RecoveryDescriptor descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
+        OutNetworkObject outNetworkObject = (OutNetworkObject) msg;
+
+        if (outNetworkObject.shouldBeSavedForRecovery()) {
+            descriptor.add(outNetworkObject);

Review Comment:
   Another question - do we nullify serialized UOS fields? If we have already 
byte[] representations, then we don't need original objects. And, considering 
that we cache a bunch of messages, they **will** take a lot of space.



##########
modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java:
##########
@@ -57,6 +57,10 @@ public class NetworkMessageTypes {
      */
     public static final short HANDSHAKE_START_RESPONSE = 4;
 
+
+    public static final short HANDSHAKE_FINISH = 8;

Review Comment:
   Formatting is messed up. Why do we have a gap in these numbers BTW?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements 
RecoveryDescriptorProvider {
+    /** Recovery descriptors. */
+    private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors = 
new ConcurrentHashMap<>();
+
+    /** {@inheritDoc} */
+    @Override
+    public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID 
launchId, short connectionIndex, boolean inbound) {
+        var key = new ChannelKey(consistentId, launchId, connectionIndex, 
inbound);
+
+        return recoveryDescriptors.computeIfAbsent(key, channelKey -> new 
RecoveryDescriptor(10));
+    }
+
+    /** Channel key. */
+    private static class ChannelKey {
+        /** Remote node's consistent id. */
+        private final String consistentId;
+
+        /** Remote node's launch id. */
+        private final UUID launchId;
+
+        /**
+         * Connection id. Every connection between this node and a remote node 
has a unique connection id,
+         * but connections with different nodes may have same ids.
+         */
+        private final short connectionId;
+
+        /** {@code true} if channel is inbound, {@code false} otherwise. */
+        private final boolean inbound;
+
+        private ChannelKey(String consistentId, UUID launchId, short 
connectionId, boolean inbound) {
+            this.consistentId = consistentId;
+            this.launchId = launchId;
+            this.connectionId = connectionId;
+            this.inbound = inbound;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ChannelKey that = (ChannelKey) o;
+            return connectionId == that.connectionId && inbound == 
that.inbound && consistentId.equals(that.consistentId)
+                    && launchId.equals(
+                    that.launchId);
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public int hashCode() {
+            return Objects.hash(consistentId, launchId, connectionId, inbound);

Review Comment:
   Does JIT inline this method? I'm curious



##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+    /** Unacknowledged messages. */
+    private final ArrayDeque<OutNetworkObject> unacknowledgedMessages;
+
+    /** Count of sent messages. */
+    private long sentCount;
+
+    /** Count of acknowledged sent messages. */
+    private long acknowledgedCount;
+
+    /** Count of received messages. */
+    private long receivedCount;
+
+    /**
+     * Constructor.
+     *
+     * @param queueLimit Maximum size of unacknowledged messages queue.
+     */
+    public RecoveryDescriptor(int queueLimit) {
+        this.unacknowledgedMessages = new ArrayDeque<>(queueLimit);
+    }
+
+    /**
+     * Returns count of received messages.
+     *
+     * @return Count of received messages.
+     */
+    public long receivedCount() {
+        return receivedCount;
+    }
+
+    /**
+     * Acknowledges that sent messages were received by the remote node.
+     *
+     * @param messagesReceivedByRemote Number of all messages received by the 
remote node.
+     */
+    public void acknowledge(long messagesReceivedByRemote) {
+        while (acknowledgedCount < messagesReceivedByRemote) {
+            OutNetworkObject req = unacknowledgedMessages.pollFirst();
+
+            assert req != null;
+
+            acknowledgedCount++;
+        }
+    }
+
+    /**
+     * Returns the number of the messages unacknowledged by the remote node.
+     *
+     * @return The number of the messages unacknowledged by the remote node.
+     */
+    public long unacknowledgedCount() {
+        long res = sentCount - acknowledgedCount;
+
+        assert res >= 0;
+        assert res == unacknowledgedMessages.size();
+
+        return res;
+    }
+
+    /**
+     * Returns unacknowledged messages.
+     *
+     * @return Unacknowledged messages.
+     */
+    public List<OutNetworkObject> unacknowledgedMessages() {
+        return new ArrayList<>(unacknowledgedMessages);
+    }
+
+    /**
+     * Adds a sent message.
+     *
+     * @param msg Message.
+     */
+    public void add(OutNetworkObject msg) {
+        msg.shouldBeSavedForRecovery(false);
+        sentCount++;
+        unacknowledgedMessages.addLast(msg);

Review Comment:
   Queue has a size limit. What happens if you can't fit a new message?



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -72,43 +76,41 @@ public class ConnectionManager {
     /** Message listeners. */
     private final List<Consumer<InNetworkObject>> listeners = new 
CopyOnWriteArrayList<>();
 
+    private final UUID launchId;
+
     /** Node consistent id. */
     private final String consistentId;
 
-    /** Client handshake manager factory. */
-    private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
-
     /** Start flag. */
     private final AtomicBoolean started = new AtomicBoolean(false);
 
     /** Stop flag. */
     private final AtomicBoolean stopped = new AtomicBoolean(false);
 
+    private final RecoveryDescriptorProvider descriptorProvider = new 
DefaultRecoveryDescriptorProvider();
+
     /**
      * Constructor.
      *
      * @param networkConfiguration          Network configuration.
      * @param serializationService          Serialization service.
      * @param consistentId                  Consistent id of this node.
-     * @param serverHandshakeManagerFactory Server handshake manager factory.
-     * @param clientHandshakeManagerFactory Client handshake manager factory.
      * @param bootstrapFactory              Bootstrap factory.
      */
     public ConnectionManager(
             NetworkView networkConfiguration,
             SerializationService serializationService,
+            UUID launchId,

Review Comment:
   New parameter is not documented



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import 
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+
+/** Pipeline utils. */
+public class PipelineUtils {
+    /** {@link ChunkedWriteHandler}'s name. */
+    private static final String CHUNKED_WRITE_HANDLER_NAME = 
"chunked-write-handler";
+
+    /**
+     * Sets up initial pipeline.
+     *
+     * @param pipeline Channel pipeline.
+     * @param serializationService Serialization service.
+     * @param handshakeManager Handshake manager.
+     * @param messageListener Message listener.
+     */
+    public static void setup(ChannelPipeline pipeline, 
PerSessionSerializationService serializationService,
+                HandshakeManager handshakeManager, Consumer<InNetworkObject> 
messageListener) {
+        pipeline.addLast(InboundDecoder.NAME, new 
InboundDecoder(serializationService));
+        pipeline.addLast(HandshakeHandler.NAME, new 
HandshakeHandler(handshakeManager, messageListener, serializationService));
+        pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new 
ChunkedWriteHandler());
+        pipeline.addLast(OutboundEncoder.NAME, new 
OutboundEncoder(serializationService));
+        pipeline.addLast(IoExceptionSuppressingHandler.NAME, new 
IoExceptionSuppressingHandler());
+    }
+
+    /**
+     * Changes pipeline after the handshake.
+     *
+     * @param pipeline Pipeline.
+     * @param descriptor Recovery descriptor.
+     * @param messageHandler Message handler.
+     * @param factory Message factory.
+     */
+    public static void afterHandshake(
+            ChannelPipeline pipeline,
+            RecoveryDescriptor descriptor,
+            MessageHandler messageHandler,
+            NetworkMessagesFactory factory
+    ) {
+        pipeline.addAfter(OutboundEncoder.NAME, OutboundRecoveryHandler.NAME, 
new OutboundRecoveryHandler(descriptor));
+        pipeline.addBefore(HandshakeHandler.NAME, InboundRecoveryHandler.NAME, 
new InboundRecoveryHandler(descriptor, factory));
+        pipeline.addAfter(HandshakeHandler.NAME, MessageHandler.NAME, 
messageHandler);
+    }
+

Review Comment:
   Please remove these empty lines :)



##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import 
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends 
acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+    /** Handler name. */
+    public static final String NAME = "inbound-recovery-handler";
+
+    /** Recovery descriptor. */
+    private final RecoveryDescriptor descriptor;
+
+    /** Messages factory. */
+    private final NetworkMessagesFactory factory;
+
+    /**
+     * Constructor.
+     *
+     * @param descriptor Recovery descriptor.
+     * @param factory Message factory.
+     */
+    public InboundRecoveryHandler(RecoveryDescriptor descriptor, 
NetworkMessagesFactory factory) {
+        this.descriptor = descriptor;
+        this.factory = factory;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        NetworkMessage message = (NetworkMessage) msg;
+
+        if (message instanceof AcknowledgementMessage) {
+            AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+            long receivedMessages = ackMessage.receivedMessages();
+
+            descriptor.acknowledge(receivedMessages);
+        } else if (message.needAck()) {
+            AcknowledgementMessage ackMsg = factory.acknowledgementMessage()

Review Comment:
   I believe we should revisit it, maybe not here. Node should passively send 
its counter every N messages or when idling. I would be cool to compare current 
approach with a "new" one, by measuring throughput and latency percentiles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to