ibessonov commented on code in PR #804:
URL: https://github.com/apache/ignite-3/pull/804#discussion_r871081859
##########
modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessage.java:
##########
@@ -24,7 +24,7 @@
*/
public interface NetworkMessage {
/** Size of the message type (in bytes), used during (de-)serialization. */
- static final int MSG_TYPE_SIZE_BYTES = 4;
+ int MSG_TYPE_SIZE_BYTES = 4;
Review Comment:
Did we implement a varlen type compression? Absolute majority of messages
only require 2 bytes
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -72,43 +76,41 @@ public class ConnectionManager {
/** Message listeners. */
private final List<Consumer<InNetworkObject>> listeners = new
CopyOnWriteArrayList<>();
+ private final UUID launchId;
Review Comment:
Please add a small javadoc
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements
RecoveryDescriptorProvider {
+ /** Recovery descriptors. */
+ private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors =
new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override
+ public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID
launchId, short connectionIndex, boolean inbound) {
+ var key = new ChannelKey(consistentId, launchId, connectionIndex,
inbound);
+
+ return recoveryDescriptors.computeIfAbsent(key, channelKey -> new
RecoveryDescriptor(10));
Review Comment:
What is a "10"?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends
acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "inbound-recovery-handler";
+
+ /** Recovery descriptor. */
+ private final RecoveryDescriptor descriptor;
+
+ /** Messages factory. */
+ private final NetworkMessagesFactory factory;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Recovery descriptor.
+ * @param factory Message factory.
+ */
+ public InboundRecoveryHandler(RecoveryDescriptor descriptor,
NetworkMessagesFactory factory) {
+ this.descriptor = descriptor;
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ NetworkMessage message = (NetworkMessage) msg;
+
+ if (message instanceof AcknowledgementMessage) {
+ AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+ long receivedMessages = ackMessage.receivedMessages();
+
+ descriptor.acknowledge(receivedMessages);
+ } else if (message.needAck()) {
+ AcknowledgementMessage ackMsg = factory.acknowledgementMessage()
Review Comment:
So, you're saying that every message that needs an ack will receive it
individually, right?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/OutboundRecoveryHandler.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.network.OutNetworkObject;
+
+/** Outbound handler that adds outgoing message to the recovery descriptor. */
+public class OutboundRecoveryHandler extends ChannelOutboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "outbound-recovery-handler";
+
+ /** Recovery descriptor. */
+ private final RecoveryDescriptor descriptor;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Recovery descriptor.
+ */
+ public OutboundRecoveryHandler(RecoveryDescriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise) throws Exception {
+ OutNetworkObject outNetworkObject = (OutNetworkObject) msg;
+
+ if (outNetworkObject.shouldBeSavedForRecovery()) {
+ descriptor.add(outNetworkObject);
Review Comment:
Another question - do we nullify serialized UOS fields? If we have already
byte[] representations, then we don't need original objects. And, considering
that we cache a bunch of messages, they **will** take a lot of space.
##########
modules/network/src/main/java/org/apache/ignite/internal/network/NetworkMessageTypes.java:
##########
@@ -57,6 +57,10 @@ public class NetworkMessageTypes {
*/
public static final short HANDSHAKE_START_RESPONSE = 4;
+
+ public static final short HANDSHAKE_FINISH = 8;
Review Comment:
Formatting is messed up. Why do we have a gap in these numbers BTW?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/DefaultRecoveryDescriptorProvider.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptorProvider;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Default implementation of the {@link RecoveryDescriptorProvider}.
+ */
+public class DefaultRecoveryDescriptorProvider implements
RecoveryDescriptorProvider {
+ /** Recovery descriptors. */
+ private final Map<ChannelKey, RecoveryDescriptor> recoveryDescriptors =
new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override
+ public RecoveryDescriptor getRecoveryDescriptor(String consistentId, UUID
launchId, short connectionIndex, boolean inbound) {
+ var key = new ChannelKey(consistentId, launchId, connectionIndex,
inbound);
+
+ return recoveryDescriptors.computeIfAbsent(key, channelKey -> new
RecoveryDescriptor(10));
+ }
+
+ /** Channel key. */
+ private static class ChannelKey {
+ /** Remote node's consistent id. */
+ private final String consistentId;
+
+ /** Remote node's launch id. */
+ private final UUID launchId;
+
+ /**
+ * Connection id. Every connection between this node and a remote node
has a unique connection id,
+ * but connections with different nodes may have same ids.
+ */
+ private final short connectionId;
+
+ /** {@code true} if channel is inbound, {@code false} otherwise. */
+ private final boolean inbound;
+
+ private ChannelKey(String consistentId, UUID launchId, short
connectionId, boolean inbound) {
+ this.consistentId = consistentId;
+ this.launchId = launchId;
+ this.connectionId = connectionId;
+ this.inbound = inbound;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ChannelKey that = (ChannelKey) o;
+ return connectionId == that.connectionId && inbound ==
that.inbound && consistentId.equals(that.consistentId)
+ && launchId.equals(
+ that.launchId);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hashCode() {
+ return Objects.hash(consistentId, launchId, connectionId, inbound);
Review Comment:
Does JIT inline this method? I'm curious
##########
modules/network/src/main/java/org/apache/ignite/internal/network/recovery/RecoveryDescriptor.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.recovery;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Recovery protocol descriptor.
+ */
+public class RecoveryDescriptor {
+ /** Unacknowledged messages. */
+ private final ArrayDeque<OutNetworkObject> unacknowledgedMessages;
+
+ /** Count of sent messages. */
+ private long sentCount;
+
+ /** Count of acknowledged sent messages. */
+ private long acknowledgedCount;
+
+ /** Count of received messages. */
+ private long receivedCount;
+
+ /**
+ * Constructor.
+ *
+ * @param queueLimit Maximum size of unacknowledged messages queue.
+ */
+ public RecoveryDescriptor(int queueLimit) {
+ this.unacknowledgedMessages = new ArrayDeque<>(queueLimit);
+ }
+
+ /**
+ * Returns count of received messages.
+ *
+ * @return Count of received messages.
+ */
+ public long receivedCount() {
+ return receivedCount;
+ }
+
+ /**
+ * Acknowledges that sent messages were received by the remote node.
+ *
+ * @param messagesReceivedByRemote Number of all messages received by the
remote node.
+ */
+ public void acknowledge(long messagesReceivedByRemote) {
+ while (acknowledgedCount < messagesReceivedByRemote) {
+ OutNetworkObject req = unacknowledgedMessages.pollFirst();
+
+ assert req != null;
+
+ acknowledgedCount++;
+ }
+ }
+
+ /**
+ * Returns the number of the messages unacknowledged by the remote node.
+ *
+ * @return The number of the messages unacknowledged by the remote node.
+ */
+ public long unacknowledgedCount() {
+ long res = sentCount - acknowledgedCount;
+
+ assert res >= 0;
+ assert res == unacknowledgedMessages.size();
+
+ return res;
+ }
+
+ /**
+ * Returns unacknowledged messages.
+ *
+ * @return Unacknowledged messages.
+ */
+ public List<OutNetworkObject> unacknowledgedMessages() {
+ return new ArrayList<>(unacknowledgedMessages);
+ }
+
+ /**
+ * Adds a sent message.
+ *
+ * @param msg Message.
+ */
+ public void add(OutNetworkObject msg) {
+ msg.shouldBeSavedForRecovery(false);
+ sentCount++;
+ unacknowledgedMessages.addLast(msg);
Review Comment:
Queue has a size limit. What happens if you can't fit a new message?
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java:
##########
@@ -72,43 +76,41 @@ public class ConnectionManager {
/** Message listeners. */
private final List<Consumer<InNetworkObject>> listeners = new
CopyOnWriteArrayList<>();
+ private final UUID launchId;
+
/** Node consistent id. */
private final String consistentId;
- /** Client handshake manager factory. */
- private final Supplier<HandshakeManager> clientHandshakeManagerFactory;
-
/** Start flag. */
private final AtomicBoolean started = new AtomicBoolean(false);
/** Stop flag. */
private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private final RecoveryDescriptorProvider descriptorProvider = new
DefaultRecoveryDescriptorProvider();
+
/**
* Constructor.
*
* @param networkConfiguration Network configuration.
* @param serializationService Serialization service.
* @param consistentId Consistent id of this node.
- * @param serverHandshakeManagerFactory Server handshake manager factory.
- * @param clientHandshakeManagerFactory Client handshake manager factory.
* @param bootstrapFactory Bootstrap factory.
*/
public ConnectionManager(
NetworkView networkConfiguration,
SerializationService serializationService,
+ UUID launchId,
Review Comment:
New parameter is not documented
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/PipelineUtils.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.handshake.HandshakeManager;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import
org.apache.ignite.internal.network.serialization.PerSessionSerializationService;
+
+/** Pipeline utils. */
+public class PipelineUtils {
+ /** {@link ChunkedWriteHandler}'s name. */
+ private static final String CHUNKED_WRITE_HANDLER_NAME =
"chunked-write-handler";
+
+ /**
+ * Sets up initial pipeline.
+ *
+ * @param pipeline Channel pipeline.
+ * @param serializationService Serialization service.
+ * @param handshakeManager Handshake manager.
+ * @param messageListener Message listener.
+ */
+ public static void setup(ChannelPipeline pipeline,
PerSessionSerializationService serializationService,
+ HandshakeManager handshakeManager, Consumer<InNetworkObject>
messageListener) {
+ pipeline.addLast(InboundDecoder.NAME, new
InboundDecoder(serializationService));
+ pipeline.addLast(HandshakeHandler.NAME, new
HandshakeHandler(handshakeManager, messageListener, serializationService));
+ pipeline.addLast(CHUNKED_WRITE_HANDLER_NAME, new
ChunkedWriteHandler());
+ pipeline.addLast(OutboundEncoder.NAME, new
OutboundEncoder(serializationService));
+ pipeline.addLast(IoExceptionSuppressingHandler.NAME, new
IoExceptionSuppressingHandler());
+ }
+
+ /**
+ * Changes pipeline after the handshake.
+ *
+ * @param pipeline Pipeline.
+ * @param descriptor Recovery descriptor.
+ * @param messageHandler Message handler.
+ * @param factory Message factory.
+ */
+ public static void afterHandshake(
+ ChannelPipeline pipeline,
+ RecoveryDescriptor descriptor,
+ MessageHandler messageHandler,
+ NetworkMessagesFactory factory
+ ) {
+ pipeline.addAfter(OutboundEncoder.NAME, OutboundRecoveryHandler.NAME,
new OutboundRecoveryHandler(descriptor));
+ pipeline.addBefore(HandshakeHandler.NAME, InboundRecoveryHandler.NAME,
new InboundRecoveryHandler(descriptor, factory));
+ pipeline.addAfter(HandshakeHandler.NAME, MessageHandler.NAME,
messageHandler);
+ }
+
Review Comment:
Please remove these empty lines :)
##########
modules/network/src/main/java/org/apache/ignite/internal/network/netty/InboundRecoveryHandler.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import java.util.Collections;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import
org.apache.ignite.internal.network.recovery.message.AcknowledgementMessage;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.OutNetworkObject;
+
+/**
+ * Inbound handler that handles incoming acknowledgement messages and sends
acknowledgement messages for other messages.
+ */
+public class InboundRecoveryHandler extends ChannelInboundHandlerAdapter {
+ /** Handler name. */
+ public static final String NAME = "inbound-recovery-handler";
+
+ /** Recovery descriptor. */
+ private final RecoveryDescriptor descriptor;
+
+ /** Messages factory. */
+ private final NetworkMessagesFactory factory;
+
+ /**
+ * Constructor.
+ *
+ * @param descriptor Recovery descriptor.
+ * @param factory Message factory.
+ */
+ public InboundRecoveryHandler(RecoveryDescriptor descriptor,
NetworkMessagesFactory factory) {
+ this.descriptor = descriptor;
+ this.factory = factory;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ NetworkMessage message = (NetworkMessage) msg;
+
+ if (message instanceof AcknowledgementMessage) {
+ AcknowledgementMessage ackMessage = (AcknowledgementMessage) msg;
+ long receivedMessages = ackMessage.receivedMessages();
+
+ descriptor.acknowledge(receivedMessages);
+ } else if (message.needAck()) {
+ AcknowledgementMessage ackMsg = factory.acknowledgementMessage()
Review Comment:
I believe we should revisit it, maybe not here. Node should passively send
its counter every N messages or when idling. I would be cool to compare current
approach with a "new" one, by measuring throughput and latency percentiles.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]