sashapolo commented on a change in pull request #575:
URL: https://github.com/apache/ignite-3/pull/575#discussion_r790695474



##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
##########
@@ -51,24 +51,26 @@
     CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg);
 
     /**
-     * Same as {@link #send(ClusterNode, NetworkMessage)} but attaches the 
given correlation ID to the given message.
+     * Sends a response to the invocation request.
+     * Guarantees are the same as for the {@link #send(ClusterNode, 
NetworkMessage)}.
      *
      * @param recipient     Recipient of the message.
      * @param msg           Message which should be delivered.
      * @param correlationId Correlation id when replying to the request.
      * @return Future of the send operation.
      */
-    CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg, 
String correlationId);
+    CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, 
long correlationId);
 
     /**
-     * Same as {@link #send(ClusterNode, NetworkMessage)} but attaches the 
given correlation ID to the given message.
+     * Sends a response to the invocation request.

Review comment:
       same here

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/NetworkMessageHandler.java
##########
@@ -28,5 +28,5 @@
      * @param senderAddr    Sender address. Use {@link 
TopologyService#getByAddress} to resolve the corresponding {@link ClusterNode}.
      * @param correlationId Correlation id. Used to track correspondence 
between requests and responses.
      */
-    void onReceived(NetworkMessage message, NetworkAddress senderAddr, String 
correlationId);
+    void onReceived(NetworkMessage message, NetworkAddress senderAddr, Long 
correlationId);

Review comment:
       why `Long` and not `long`? I guess because `correlationId` can be null? 
If yes, you should add that to the javadocs and add a corresponding annotation

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java
##########
@@ -51,24 +51,26 @@
     CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg);
 
     /**
-     * Same as {@link #send(ClusterNode, NetworkMessage)} but attaches the 
given correlation ID to the given message.
+     * Sends a response to the invocation request.

Review comment:
       ```suggestion
        * Sends a response to a {@link invoke} request.
   ```

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/NetworkAddress.java
##########
@@ -36,6 +36,9 @@
     /** Port. */
     private final int port;
 
+    /** Consistent id. TODO: IGNITE-16373 Temporal until ticket is not 
resolved. */

Review comment:
       ```suggestion
       /** Consistent id. TODO: IGNITE-16373 Temporary until ticket is not 
resolved. */
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/handshake/HandshakeResult.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.network.handshake;
+
+import java.util.UUID;
+
+/**
+ * Handshake result.
+ */
+public class HandshakeResult {
+    /** Handshake action. */
+    private final HandshakeAction action;
+
+    /** Remote node consistent id. */
+    private final String consistentId;
+
+    /** Remote node launch id. */
+    private final UUID launchId;
+
+    /**
+     * Constructor for a successful handshake.

Review comment:
       I would suggest to use factory methods instead

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;

Review comment:
       ```suggestion
           NetworkMessage msg0 = message;
   ```

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/NetworkAddress.java
##########
@@ -36,6 +36,9 @@
     /** Port. */
     private final int port;
 
+    /** Consistent id. TODO: IGNITE-16373 Temporal until ticket is not 
resolved. */
+    private final String consistentId;

Review comment:
       Should be marked as `@Nullable`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -231,4 +213,20 @@ protected ClusterConfig 
clusterConfig(ClusterMembershipView cfg) {
                 .map(addr -> Address.create(addr.host(), addr.port()))
                 .collect(Collectors.toList());
     }
+
+    /**
+     * Creates everything that is needed for the user object serialization.
+     *
+     * @return User object serialization context.
+     */
+    @NotNull
+    private static UserObjectSerializationContext 
createUserObjectSerializationContext() {

Review comment:
       why did you move this method?

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
##########
@@ -54,4 +54,12 @@
      * @return The node or {@code null} if the node is not yet discovered or 
dead.
      */
     @Nullable ClusterNode getByAddress(NetworkAddress addr);
+
+    /**
+     * Returns a cluster node by it's consistent id..

Review comment:
       ```suggestion
        * Returns a cluster node by its consistent id..
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = address.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
addr).thenCompose(sender -> sender.send(msg0));
+    }
+
+    /**
+     * Sends an invocation request. If the target is the current node, then 
message will be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param addr Target address.
+     * @param msg Message.
+     * @param timeout Invocation timeout.
+     * @return A future holding the response or error if the expected response 
was not received.
+     */
+    private CompletableFuture<NetworkMessage> invoke0(@Nullable ClusterNode 
recipient, NetworkAddress addr,
+            NetworkMessage msg, long timeout) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        long correlationId = createCorrelationId();
+
+        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<NetworkMessage>()
+                .orTimeout(timeout, TimeUnit.MILLISECONDS);
+
+        requestsMap.put(correlationId, responseFuture);
+
+        InetSocketAddress address = new InetSocketAddress(addr.host(), 
addr.port());
+
+        if (isSelf(recipient, addr.consistentId(), address)) {
+            sendToSelf(msg, correlationId);
+            return responseFuture;
+        }
+
+        InvokeRequest message = requestFromMessage(msg, correlationId);
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = addr.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
address).thenCompose(sender -> sender.send(message))
+                .thenCompose(unused -> responseFuture);
+    }
+
+    /**
+     * Sends a message to the current node.
+     *
+     * @param msg Message.
+     * @param correlationId Correlation id.
+     */
+    private void sendToSelf(NetworkMessage msg, Long correlationId) {
+        var address = new NetworkAddress(localAddress.getHostName(), 
localAddress.getPort(), connectionManager.consistentId());
+
+        for (NetworkMessageHandler networkMessageHandler : 
getMessageHandlers(msg.groupType())) {
+            networkMessageHandler.onReceived(msg, address, correlationId);
+        }
+    }
+
+    /**
+     * Handles an incoming messages.
+     *
+     * @param consistentId Sender's consistent id.
+     * @param msg Incoming message.
+     */
+    private void onMessage(String consistentId, NetworkMessage msg) {
+        if (msg instanceof ScaleCubeMessage) {
+            // ScaleCube messages are handled in the ScaleCubeTransport
+            return;
+        }
+
+        if (msg instanceof InvokeResponse) {
+            // Handle invocation response
+            InvokeResponse response = (InvokeResponse) msg;
+            onInvokeResponse(response.message(), response.correlationId());
+            return;
+        }
+
+        Long correlationId = null;
+        NetworkMessage message = msg;
+
+        if (msg instanceof InvokeRequest) {
+            // Unwrap invocation request
+            InvokeRequest messageWithCorrelation = (InvokeRequest) msg;
+            correlationId = messageWithCorrelation.correlationId();
+            message = messageWithCorrelation.message();
+        }
+
+        ClusterNode sender = topologyService.getByConsistentId(consistentId);
+
+        NetworkAddress senderAddress;
+
+        if (sender != null) {
+            senderAddress = sender.address();
+        } else {
+            // TODO: IGNITE-16373 Use fake address if sender is not in cluster 
yet. For the response, consistentId from this address will
+            // be used
+            senderAddress = new NetworkAddress(UNKNOWN_HOST, 
UNKNOWN_HOST_PORT, consistentId);
+        }
+
+        for (NetworkMessageHandler networkMessageHandler : 
getMessageHandlers(message.groupType())) {
+            // TODO: IGNITE-16373 We should pass ClusterNode and not the 
address
+            networkMessageHandler.onReceived(message, senderAddress, 
correlationId);
+        }
+    }
+
+    /**
+     * Handles a response to an invocation request.
+     *
+     * @param response Response message.
+     * @param correlationId Request's correlation id.
+     */
+    private void onInvokeResponse(NetworkMessage response, long correlationId) 
{

Review comment:
       Actually it would be better to have `Long` here as well in order to 
avoid boxing in `remove`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = address.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
addr).thenCompose(sender -> sender.send(msg0));
+    }
+
+    /**
+     * Sends an invocation request. If the target is the current node, then 
message will be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param addr Target address.
+     * @param msg Message.
+     * @param timeout Invocation timeout.
+     * @return A future holding the response or error if the expected response 
was not received.
+     */
+    private CompletableFuture<NetworkMessage> invoke0(@Nullable ClusterNode 
recipient, NetworkAddress addr,
+            NetworkMessage msg, long timeout) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        long correlationId = createCorrelationId();
+
+        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<NetworkMessage>()
+                .orTimeout(timeout, TimeUnit.MILLISECONDS);
+
+        requestsMap.put(correlationId, responseFuture);
+
+        InetSocketAddress address = new InetSocketAddress(addr.host(), 
addr.port());
+
+        if (isSelf(recipient, addr.consistentId(), address)) {
+            sendToSelf(msg, correlationId);
+            return responseFuture;
+        }
+
+        InvokeRequest message = requestFromMessage(msg, correlationId);
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = addr.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
address).thenCompose(sender -> sender.send(message))
+                .thenCompose(unused -> responseFuture);
+    }
+
+    /**
+     * Sends a message to the current node.
+     *
+     * @param msg Message.
+     * @param correlationId Correlation id.
+     */
+    private void sendToSelf(NetworkMessage msg, Long correlationId) {
+        var address = new NetworkAddress(localAddress.getHostName(), 
localAddress.getPort(), connectionManager.consistentId());
+
+        for (NetworkMessageHandler networkMessageHandler : 
getMessageHandlers(msg.groupType())) {
+            networkMessageHandler.onReceived(msg, address, correlationId);
+        }
+    }
+
+    /**
+     * Handles an incoming messages.
+     *
+     * @param consistentId Sender's consistent id.
+     * @param msg Incoming message.
+     */
+    private void onMessage(String consistentId, NetworkMessage msg) {
+        if (msg instanceof ScaleCubeMessage) {
+            // ScaleCube messages are handled in the ScaleCubeTransport
+            return;
+        }
+
+        if (msg instanceof InvokeResponse) {
+            // Handle invocation response
+            InvokeResponse response = (InvokeResponse) msg;
+            onInvokeResponse(response.message(), response.correlationId());
+            return;
+        }
+
+        Long correlationId = null;
+        NetworkMessage message = msg;
+
+        if (msg instanceof InvokeRequest) {
+            // Unwrap invocation request
+            InvokeRequest messageWithCorrelation = (InvokeRequest) msg;
+            correlationId = messageWithCorrelation.correlationId();
+            message = messageWithCorrelation.message();
+        }
+
+        ClusterNode sender = topologyService.getByConsistentId(consistentId);
+
+        NetworkAddress senderAddress;
+
+        if (sender != null) {
+            senderAddress = sender.address();
+        } else {
+            // TODO: IGNITE-16373 Use fake address if sender is not in cluster 
yet. For the response, consistentId from this address will
+            // be used

Review comment:
       ```suggestion
               //  be used
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = address.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
addr).thenCompose(sender -> sender.send(msg0));
+    }
+
+    /**
+     * Sends an invocation request. If the target is the current node, then 
message will be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param addr Target address.
+     * @param msg Message.
+     * @param timeout Invocation timeout.
+     * @return A future holding the response or error if the expected response 
was not received.
+     */
+    private CompletableFuture<NetworkMessage> invoke0(@Nullable ClusterNode 
recipient, NetworkAddress addr,
+            NetworkMessage msg, long timeout) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        long correlationId = createCorrelationId();
+
+        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<NetworkMessage>()
+                .orTimeout(timeout, TimeUnit.MILLISECONDS);
+
+        requestsMap.put(correlationId, responseFuture);
+
+        InetSocketAddress address = new InetSocketAddress(addr.host(), 
addr.port());
+
+        if (isSelf(recipient, addr.consistentId(), address)) {
+            sendToSelf(msg, correlationId);
+            return responseFuture;
+        }
+
+        InvokeRequest message = requestFromMessage(msg, correlationId);
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = addr.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
address).thenCompose(sender -> sender.send(message))
+                .thenCompose(unused -> responseFuture);
+    }
+
+    /**
+     * Sends a message to the current node.
+     *
+     * @param msg Message.
+     * @param correlationId Correlation id.
+     */
+    private void sendToSelf(NetworkMessage msg, Long correlationId) {
+        var address = new NetworkAddress(localAddress.getHostName(), 
localAddress.getPort(), connectionManager.consistentId());
+
+        for (NetworkMessageHandler networkMessageHandler : 
getMessageHandlers(msg.groupType())) {
+            networkMessageHandler.onReceived(msg, address, correlationId);
+        }
+    }
+
+    /**
+     * Handles an incoming messages.
+     *
+     * @param consistentId Sender's consistent id.
+     * @param msg Incoming message.
+     */
+    private void onMessage(String consistentId, NetworkMessage msg) {
+        if (msg instanceof ScaleCubeMessage) {
+            // ScaleCube messages are handled in the ScaleCubeTransport
+            return;
+        }
+
+        if (msg instanceof InvokeResponse) {
+            // Handle invocation response
+            InvokeResponse response = (InvokeResponse) msg;
+            onInvokeResponse(response.message(), response.correlationId());
+            return;
+        }
+
+        Long correlationId = null;
+        NetworkMessage message = msg;
+
+        if (msg instanceof InvokeRequest) {
+            // Unwrap invocation request
+            InvokeRequest messageWithCorrelation = (InvokeRequest) msg;
+            correlationId = messageWithCorrelation.correlationId();
+            message = messageWithCorrelation.message();
+        }
+
+        ClusterNode sender = topologyService.getByConsistentId(consistentId);
+
+        NetworkAddress senderAddress;
+
+        if (sender != null) {
+            senderAddress = sender.address();
+        } else {
+            // TODO: IGNITE-16373 Use fake address if sender is not in cluster 
yet. For the response, consistentId from this address will
+            // be used
+            senderAddress = new NetworkAddress(UNKNOWN_HOST, 
UNKNOWN_HOST_PORT, consistentId);
+        }
+
+        for (NetworkMessageHandler networkMessageHandler : 
getMessageHandlers(message.groupType())) {
+            // TODO: IGNITE-16373 We should pass ClusterNode and not the 
address
+            networkMessageHandler.onReceived(message, senderAddress, 
correlationId);
+        }
+    }
+
+    /**
+     * Handles a response to an invocation request.
+     *
+     * @param response Response message.
+     * @param correlationId Request's correlation id.
+     */
+    private void onInvokeResponse(NetworkMessage response, long correlationId) 
{
+        CompletableFuture<NetworkMessage> responseFuture = 
requestsMap.remove(correlationId);
+        if (responseFuture != null) {
+            responseFuture.complete(response);
+        }
+    }
+
+    /**
+     * Creates an {@link InvokeRequest} from a message and a correlation id.
+     *
+     * @param message Message.
+     * @param correlationId Correlation id.
+     * @return Invoke request message.
+     */
+    private InvokeRequest requestFromMessage(NetworkMessage message, long 
correlationId) {
+        return 
factory.invokeRequest().correlationId(correlationId).message(message).build();
+    }
+
+    /**
+     * Creates an {@link InvokeResponse} from a message and a correlation id.
+     *
+     * @param message Message.
+     * @param correlationId Correlation id.
+     * @return Invoke response message.
+     */
+    private InvokeResponse responseFromMessage(NetworkMessage message, long 
correlationId) {
+        return 
factory.invokeResponse().correlationId(correlationId).message(message).build();
+    }
+
+    /**
+     * Creates a correlation id for an invocation request.
+     *
+     * @return New correlation id.
+     */
+    private long createCorrelationId() {
+        return correlationIdGenerator.getAndIncrement();
+    }
+
+    /**
+     * Checks if the target is the current node.
+     *
+     * @param target Target cluster node. TODO: IGNITE-16373 May be {@code 
null} due to the ticket.
+     * @param targetSocketAddress Target's socket address.
+     * @return {@code true} if the target is the current node, {@code false} 
otherwise.
+     */
+    private boolean isSelf(@Nullable ClusterNode target, @Nullable String 
consistentId, SocketAddress targetSocketAddress) {
+        String cid = consistentId;
+
+        if (cid == null && target != null) {
+            cid = target.name();
+        }
+
+        if (cid != null) {
+            return connectionManager.consistentId().equals(cid);
+        }
+
+        if (Objects.equals(localAddress, targetSocketAddress)) {
+            return true;
+        }
+
+        InetSocketAddress targetInetSocketAddress = (InetSocketAddress) 
targetSocketAddress;
+
+        assert !targetInetSocketAddress.getHostName().equals(UNKNOWN_HOST) && 
targetInetSocketAddress.getPort() != UNKNOWN_HOST_PORT;
+
+        InetAddress targetInetAddress = targetInetSocketAddress.getAddress();
+        if (targetInetAddress.isAnyLocalAddress() || 
targetInetAddress.isLoopbackAddress()) {
+            return targetInetSocketAddress.getPort() == localAddress.getPort();
+        }
+        return false;
+    }
+
+    /**
+     * Stops the messaging service.
+     */
+    public void stop() {
+        requestsMap.values().forEach(fut -> {
+            fut.completeExceptionally(new NodeStoppingException());

Review comment:
       Some minor improvements:
   1. You can construct the exception only once
   2. `requestsMap.values().forEach(fut -> fut.completeExceptionally(new 
NodeStoppingException()));`
   3. Shall we clean the `requestsMap`?

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/NetworkAddress.java
##########
@@ -104,6 +121,15 @@ public int port() {
         return port;
     }
 
+    /**
+     * Returns the consistent id.
+     *
+     * @return Consistent id.
+     */
+    public String consistentId() {

Review comment:
       Should be marked as `@Nullable`

##########
File path: 
modules/network-api/src/main/java/org/apache/ignite/network/TopologyService.java
##########
@@ -54,4 +54,12 @@
      * @return The node or {@code null} if the node is not yet discovered or 
dead.
      */
     @Nullable ClusterNode getByAddress(NetworkAddress addr);
+
+    /**
+     * Returns a cluster node by it's consistent id..
+     *
+     * @param consistentId Consistent id..
+     * @return The node or {@code null} if the node is not yet discovered or 
dead.

Review comment:
       ```suggestion
        * @return The node or {@code null} if the node has not yet been 
discovered or is offline.
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = address.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
addr).thenCompose(sender -> sender.send(msg0));
+    }
+
+    /**
+     * Sends an invocation request. If the target is the current node, then 
message will be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param addr Target address.
+     * @param msg Message.
+     * @param timeout Invocation timeout.
+     * @return A future holding the response or error if the expected response 
was not received.
+     */
+    private CompletableFuture<NetworkMessage> invoke0(@Nullable ClusterNode 
recipient, NetworkAddress addr,
+            NetworkMessage msg, long timeout) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        long correlationId = createCorrelationId();
+
+        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<NetworkMessage>()
+                .orTimeout(timeout, TimeUnit.MILLISECONDS);
+
+        requestsMap.put(correlationId, responseFuture);
+
+        InetSocketAddress address = new InetSocketAddress(addr.host(), 
addr.port());
+
+        if (isSelf(recipient, addr.consistentId(), address)) {
+            sendToSelf(msg, correlationId);
+            return responseFuture;
+        }
+
+        InvokeRequest message = requestFromMessage(msg, correlationId);
+
+        String recipientConsistentId;

Review comment:
       I think a ternary operator will look better here: `String 
recipientConsistentId = recipient == null ? address.consistentId() : 
recipient.name();`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/netty/HandshakeHandler.java
##########
@@ -100,13 +106,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, 
Throwable cause) {
     /**
      * Handle {@link HandshakeAction}.
      *
-     * @param action Handshake action.
+     * @param result Handshake result.
      * @param ctx    Netty channel context.
      */
-    private void handleHandshakeAction(HandshakeAction action, 
ChannelHandlerContext ctx) {
+    private void handleHandshakeAction(HandshakeResult result, 
ChannelHandlerContext ctx) {
+        HandshakeAction action = result.action();
         switch (action) {
             case REMOVE_HANDLER:
-                ctx.pipeline().remove(this);
+                ChannelInboundHandlerAdapter newHandler = 
newHandlerCreator.apply(result.consistentId());
+                ctx.pipeline().replace(this, null, newHandler);

Review comment:
       maybe it would be nice to provide a new handler name, to guard ourselves 
from races when removing the handshake handler (just in case)

##########
File path: 
modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java
##########
@@ -201,10 +201,10 @@ public SocketAddress getLocalAddress() {
     /**
      * Callback that is called upon receiving a new message.
      *
-     * @param from    Source of the message.
+     * @param from    Consistent id of the message's sender.
      * @param message New message.
      */
-    private void onMessage(SocketAddress from, NetworkMessage message) {
+    private void onMessage(String from, NetworkMessage message) {

Review comment:
       ```suggestion
       private void onMessage(String consistentId, NetworkMessage message) {
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = address.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
addr).thenCompose(sender -> sender.send(msg0));
+    }
+
+    /**
+     * Sends an invocation request. If the target is the current node, then 
message will be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param addr Target address.
+     * @param msg Message.
+     * @param timeout Invocation timeout.
+     * @return A future holding the response or error if the expected response 
was not received.
+     */
+    private CompletableFuture<NetworkMessage> invoke0(@Nullable ClusterNode 
recipient, NetworkAddress addr,
+            NetworkMessage msg, long timeout) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        long correlationId = createCorrelationId();
+
+        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<NetworkMessage>()
+                .orTimeout(timeout, TimeUnit.MILLISECONDS);
+
+        requestsMap.put(correlationId, responseFuture);
+
+        InetSocketAddress address = new InetSocketAddress(addr.host(), 
addr.port());
+
+        if (isSelf(recipient, addr.consistentId(), address)) {
+            sendToSelf(msg, correlationId);
+            return responseFuture;
+        }
+
+        InvokeRequest message = requestFromMessage(msg, correlationId);
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = addr.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
address).thenCompose(sender -> sender.send(message))
+                .thenCompose(unused -> responseFuture);
+    }
+
+    /**
+     * Sends a message to the current node.
+     *
+     * @param msg Message.
+     * @param correlationId Correlation id.
+     */
+    private void sendToSelf(NetworkMessage msg, Long correlationId) {

Review comment:
       ```suggestion
       private void sendToSelf(NetworkMessage msg, @Nullable Long 
correlationId) {
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;
+
+        String recipientConsistentId;

Review comment:
       same here about the ternary operator

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();

Review comment:
       This factory should be a constructor parameter, e.g. it is created in 
the `ClusterServiceFactory`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;

Review comment:
       I think using a ternary operator will be better: `NetworkMessage message 
= correlationId == null ? msg : responseFromMessage(msg, correlationId);`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/DefaultMessagingService.java
##########
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.network.NetworkMessagesFactory;
+import org.apache.ignite.internal.network.message.InvokeRequest;
+import org.apache.ignite.internal.network.message.InvokeResponse;
+import org.apache.ignite.internal.network.message.ScaleCubeMessage;
+import org.apache.ignite.internal.network.netty.ConnectionManager;
+import org.apache.ignite.internal.network.netty.NettySender;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.jetbrains.annotations.Nullable;
+
+/** Default messaging service implementation. */
+public class DefaultMessagingService extends AbstractMessagingService {
+    /** Network messages factory. */
+    private final NetworkMessagesFactory factory = new 
NetworkMessagesFactory();
+
+    /** Topology service. */
+    private final TopologyService topologyService;
+
+    /** Connection manager that provides access to {@link NettySender}. */
+    private volatile ConnectionManager connectionManager;
+
+    /**
+     * This node's local socket address. Not volatile because it's piggybacked 
by the {@link #connectionManager} and connection manager
+     * is always accessed before the localAddress.
+     */
+    private InetSocketAddress localAddress;
+
+    /** Collection that maps correlation id to the future for an invocation 
request. */
+    private final ConcurrentMap<Long, CompletableFuture<NetworkMessage>> 
requestsMap = new ConcurrentHashMap<>();
+
+    /** Correlation id generator. */
+    private final AtomicLong correlationIdGenerator = new AtomicLong();
+
+    /** Fake host for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final String UNKNOWN_HOST = "unknown";
+
+    /** Fake port for nodes that are not in the topology yet. TODO: 
IGNITE-16373 Remove after the ticket is resolved. */
+    private static final int UNKNOWN_HOST_PORT = 1337;
+
+    /**
+     * Constructor.
+     *
+     * @param topologyService Toplogy service.
+     */
+    public DefaultMessagingService(TopologyService topologyService) {
+        this.topologyService = topologyService;
+    }
+
+    /**
+     * Resolves cyclic dependency and sets up the connection manager.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public void setConnectionManager(ConnectionManager connectionManager) {
+        this.localAddress = (InetSocketAddress) 
connectionManager.getLocalAddress();
+        this.connectionManager = connectionManager;
+        connectionManager.addListener(this::onMessage);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void weakSend(ClusterNode recipient, NetworkMessage msg) {
+        send(recipient, msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage 
msg) {
+        return send0(recipient, recipient.address(), msg, null);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(ClusterNode recipient, 
NetworkMessage msg, long correlationId) {
+        return send0(recipient, recipient.address(), msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<Void> respond(NetworkAddress addr, NetworkMessage 
msg, long correlationId) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+        return send0(recipient, addr, msg, correlationId);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(ClusterNode recipient, 
NetworkMessage msg, long timeout) {
+        return invoke0(recipient, recipient.address(), msg, timeout);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<NetworkMessage> invoke(NetworkAddress addr, 
NetworkMessage msg, long timeout) {
+        ClusterNode recipient = topologyService.getByAddress(addr);
+
+        return invoke0(recipient, addr, msg, timeout);
+    }
+
+    /**
+     * Sends a message. If the target is the current node, then message will 
be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param address Target address.
+     * @param msg Message.
+     * @param correlationId Correlation id. Not null iff the message is a 
response to the invocation request.
+     * @return Future of the send operation.
+     */
+    private CompletableFuture<Void> send0(@Nullable ClusterNode recipient, 
NetworkAddress address, NetworkMessage msg,
+            @Nullable Long correlationId) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        InetSocketAddress addr = new InetSocketAddress(address.host(), 
address.port());
+
+        if (isSelf(recipient, address.consistentId(), addr)) {
+            if (correlationId != null) {
+                onInvokeResponse(msg, correlationId);
+            } else {
+                sendToSelf(msg, null);
+            }
+
+            return CompletableFuture.completedFuture(null);
+        }
+
+        NetworkMessage message = msg;
+
+        if (correlationId != null) {
+            message = responseFromMessage(msg, correlationId);
+        }
+
+        final NetworkMessage msg0 = message;
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = address.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
addr).thenCompose(sender -> sender.send(msg0));
+    }
+
+    /**
+     * Sends an invocation request. If the target is the current node, then 
message will be delivered immediately.
+     *
+     * @param recipient Target cluster node. TODO: Maybe {@code null} due to 
IGNITE-16373.
+     * @param addr Target address.
+     * @param msg Message.
+     * @param timeout Invocation timeout.
+     * @return A future holding the response or error if the expected response 
was not received.
+     */
+    private CompletableFuture<NetworkMessage> invoke0(@Nullable ClusterNode 
recipient, NetworkAddress addr,
+            NetworkMessage msg, long timeout) {
+        if (connectionManager.isStopped()) {
+            return failedFuture(new NodeStoppingException());
+        }
+
+        long correlationId = createCorrelationId();
+
+        CompletableFuture<NetworkMessage> responseFuture = new 
CompletableFuture<NetworkMessage>()
+                .orTimeout(timeout, TimeUnit.MILLISECONDS);
+
+        requestsMap.put(correlationId, responseFuture);
+
+        InetSocketAddress address = new InetSocketAddress(addr.host(), 
addr.port());
+
+        if (isSelf(recipient, addr.consistentId(), address)) {
+            sendToSelf(msg, correlationId);
+            return responseFuture;
+        }
+
+        InvokeRequest message = requestFromMessage(msg, correlationId);
+
+        String recipientConsistentId;
+
+        if (recipient != null) {
+            recipientConsistentId = recipient.name();
+        } else {
+            recipientConsistentId = addr.consistentId();
+        }
+
+        return connectionManager.channel(recipientConsistentId, 
address).thenCompose(sender -> sender.send(message))
+                .thenCompose(unused -> responseFuture);
+    }
+
+    /**
+     * Sends a message to the current node.
+     *
+     * @param msg Message.
+     * @param correlationId Correlation id.
+     */
+    private void sendToSelf(NetworkMessage msg, Long correlationId) {
+        var address = new NetworkAddress(localAddress.getHostName(), 
localAddress.getPort(), connectionManager.consistentId());
+
+        for (NetworkMessageHandler networkMessageHandler : 
getMessageHandlers(msg.groupType())) {
+            networkMessageHandler.onReceived(msg, address, correlationId);
+        }
+    }
+
+    /**
+     * Handles an incoming messages.
+     *
+     * @param consistentId Sender's consistent id.
+     * @param msg Incoming message.
+     */
+    private void onMessage(String consistentId, NetworkMessage msg) {
+        if (msg instanceof ScaleCubeMessage) {
+            // ScaleCube messages are handled in the ScaleCubeTransport
+            return;
+        }
+
+        if (msg instanceof InvokeResponse) {
+            // Handle invocation response

Review comment:
       This comment looks kind of obvious to me




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to