sashapolo commented on a change in pull request #102:
URL: https://github.com/apache/ignite-3/pull/102#discussion_r628022267



##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
##########
@@ -251,13 +254,16 @@ private static void stopForcefully(ClusterService 
cluster) throws Exception {
     }
 
     /**
-     * Wrapper for cluster.
+     * Wrapper for a cluster.
      */
     private static final class Cluster {
         /** */
-        private static final MessageSerializationRegistry 
SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+        private final ClusterServiceFactory NETWORK_FACTORY = new 
ScaleCubeClusterServiceFactory();

Review comment:
       These variables are no longer static and should be renamed

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(manager -> manager.stop());
+    }
+
+    /**
+     * Tests that a message is sent successfully using the ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {
+            fut.complete(message);
+        });
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        sender.send(testMessage).join();
+
+        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+
+        assertEquals(TestMessage.class, receivedMessage.getClass());
+        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+    }
+
+    /**
+     * Tests that the resources of a connection manager are closed after a 
shutdown.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testShutdown() throws Exception {
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        NettySender sender1 = manager1.channel(address(port2)).get();
+        NettySender sender2 = manager2.channel(address(port1)).get();
+
+        assertNotNull(sender1);
+        assertNotNull(sender2);
+
+        Stream.of(manager1, manager2).forEach(manager -> {
+            NettyServer server = manager.server();
+            Collection<NettyClient> clients = manager.clients();
+
+            manager.stop();
+
+            assertFalse(server.isRunning());
+
+            boolean clientsStopped = 
clients.stream().allMatch(NettyClient::isDisconnected);
+
+            assertTrue(clientsStopped);
+        });
+    }
+
+    /**
+     * Tests that after a channel was closed, a new channel is opened upon a 
request.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCanReconnectAfterFail() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());

Review comment:
       ```suggestion
           TestMessage testMessage = new TestMessage(msgText);
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(ConnectionManager.class);
+
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<SocketAddress, NettySender> channels = new 
ConcurrentHashMap<>();
+
+    /** Clients. */
+    private final Map<SocketAddress, NettyClient> clients = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = 
new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param registry Serialization registry.
+     */
+    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+        this.serializationRegistry = registry;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public SocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Gets a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(SocketAddress address) {
+        NettySender channel = channels.compute(
+            address,
+            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : 
sender
+        );
+
+        if (channel != null)
+            return CompletableFuture.completedFuture(channel);
+
+        NettyClient client = clients.compute(address, (addr, existingClient) 
-> {
+            if (existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected())
+                return existingClient;
+
+            return connect(addr);
+        });
+
+        return client.sender();
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(SocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> consumer.accept(from, message));
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(NettySender channel) {
+        SocketAddress remoteAddress = channel.remoteAddress();
+        channels.put(remoteAddress, channel);
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(SocketAddress address) {
+        NettyClient client = new NettyClient(

Review comment:
       this fits on one line:
   ```
   NettyClient client = new NettyClient(address, serializationRegistry);
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(ConnectionManager.class);
+
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<SocketAddress, NettySender> channels = new 
ConcurrentHashMap<>();
+
+    /** Clients. */
+    private final Map<SocketAddress, NettyClient> clients = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = 
new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param registry Serialization registry.
+     */
+    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+        this.serializationRegistry = registry;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public SocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Gets a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(SocketAddress address) {
+        NettySender channel = channels.compute(
+            address,
+            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : 
sender
+        );
+
+        if (channel != null)
+            return CompletableFuture.completedFuture(channel);
+
+        NettyClient client = clients.compute(address, (addr, existingClient) 
-> {
+            if (existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected())
+                return existingClient;
+
+            return connect(addr);
+        });
+
+        return client.sender();
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(SocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> consumer.accept(from, message));
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(NettySender channel) {
+        SocketAddress remoteAddress = channel.remoteAddress();
+        channels.put(remoteAddress, channel);
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(SocketAddress address) {
+        NettyClient client = new NettyClient(
+            address,
+            serializationRegistry
+        );
+
+        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);
+            else
+                channels.put(address, sender);
+        });
+
+        return client;
+    }
+
+    /**
+     * Add incoming message listener.
+     *
+     * @param listener Message listener.
+     */
+    public void addListener(BiConsumer<SocketAddress, NetworkMessage> 
listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Stops the server and all clients.
+     */
+    public void stop() {
+         var stream = Stream.concat(
+            clients.values().stream().map(NettyClient::stop),
+            Stream.of(server.stop())
+        );
+
+         var stopFut = 
CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));
+
+         try {
+             stopFut.join();
+             clientWorkerGroup.shutdownGracefully().sync();
+         }
+         catch (Exception e) {
+             LOG.warn("Failed to stop ConnectionManager: " + e.getMessage());

Review comment:
       ```suggestion
                LOG.warn("Failed to stop the ConnectionManager: " + 
e.getMessage());
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/DelegatingTransportFactory.java
##########
@@ -35,50 +34,66 @@
     /** */
     private final ScaleCubeMessagingService messagingService;
 
+    /** Delegate transport factory. */
+    private final TransportFactory factory;
+
     /**
      * @param messagingService Messaging service.
+     * @param factory Delegate transport factory.
      */
-    DelegatingTransportFactory(ScaleCubeMessagingService messagingService) {
+    DelegatingTransportFactory(ScaleCubeMessagingService messagingService, 
TransportFactory factory) {
         this.messagingService = messagingService;
+        this.factory = factory;
     }
 
     /** {@inheritDoc} */
     @Override public Transport createTransport(TransportConfig config) {
-        var delegateFactory = TransportFactory.INSTANCE == null ? new 
TcpTransportFactory() : TransportFactory.INSTANCE;
-
-        Transport delegate = delegateFactory.createTransport(config);
+        Transport delegate = factory.createTransport(config);

Review comment:
       This factory is no longer needed, you can simply handle the self-request 
case correctly inside our custom transport

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMarshallingUtils;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Decodes {@link ByteBuf}s into {@link NetworkMessage}s.
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(InboundDecoder.class);
+
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY);
+        MessageReader reader = readerAttr.get();
+
+        if (reader == null) {
+            reader = new DirectMessageReader(serializationRegistry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            readerAttr.set(reader);

Review comment:
       should we use a more thread-safe approach here (e.g. `compareAndSet`)?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/AllTypesMessageGenerator.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal;
+
+import java.lang.reflect.Field;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+/**
+ * Generator for an {@link AllTypesMessage}.
+ */
+public class AllTypesMessageGenerator {
+    /**
+     * Generate a new {@link AllTypesMessage}.
+     *
+     * @param seed Random seed.
+     * @param nestedMsg {@code true} if nested messages should be generated, 
{@code false} otherwise.
+     * @return Message.
+     * @throws Exception If failed.
+     */
+    public static AllTypesMessage generate(long seed, boolean nestedMsg) {
+        try {
+            var random = new Random(seed);
+
+            var message = new AllTypesMessage();
+
+            Field[] fields = AllTypesMessage.class.getDeclaredFields();
+
+            for (Field field : fields) {
+                field.setAccessible(true);

Review comment:
       Aren't these fields already accessible?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NettyClient}.
+ */
+class NettyClientTest {
+    /** */
+    private final SocketAddress address = 
InetSocketAddress.createUnresolved("", 0);
+
+    /**
+     * Tests a scenario where NettyClient connects successfully.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSuccessfullConnect() throws InterruptedException, 
ExecutionException, TimeoutException {

Review comment:
       ```suggestion
       public void testSuccessfulConnect() throws Exception {
   ```

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(manager -> manager.stop());
+    }
+
+    /**
+     * Tests that a message is sent successfully using the ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {
+            fut.complete(message);
+        });
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        sender.send(testMessage).join();
+
+        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+
+        assertEquals(TestMessage.class, receivedMessage.getClass());
+        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+    }
+
+    /**
+     * Tests that the resources of a connection manager are closed after a 
shutdown.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testShutdown() throws Exception {
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        NettySender sender1 = manager1.channel(address(port2)).get();
+        NettySender sender2 = manager2.channel(address(port1)).get();
+
+        assertNotNull(sender1);
+        assertNotNull(sender2);
+
+        Stream.of(manager1, manager2).forEach(manager -> {
+            NettyServer server = manager.server();
+            Collection<NettyClient> clients = manager.clients();
+
+            manager.stop();
+
+            assertFalse(server.isRunning());
+
+            boolean clientsStopped = 
clients.stream().allMatch(NettyClient::isDisconnected);
+
+            assertTrue(clientsStopped);
+        });
+    }
+
+    /**
+     * Tests that after a channel was closed, a new channel is opened upon a 
request.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCanReconnectAfterFail() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        manager2.stop();
+
+        final var finalSender = sender;
+
+        assertThrows(ClosedChannelException.class, () -> {
+            try {
+                finalSender.send(testMessage).join();
+            }
+            catch (Exception e) {
+                throw e.getCause();
+            }
+        });
+
+        manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {
+            fut.complete(message);
+        });
+
+        sender = manager1.channel(address(port2)).get();
+
+        sender.send(testMessage).join();
+
+        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+
+        assertEquals(TestMessage.class, receivedMessage.getClass());

Review comment:
       same here

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(manager -> manager.stop());
+    }
+
+    /**
+     * Tests that a message is sent successfully using the ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {

Review comment:
       `manager2.addListener((address, message) -> fut.complete(message));`

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(manager -> manager.stop());
+    }
+
+    /**
+     * Tests that a message is sent successfully using the ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {
+            fut.complete(message);
+        });
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());

Review comment:
       ```suggestion
           TestMessage testMessage = new TestMessage(msgText);
   ```

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(manager -> manager.stop());
+    }
+
+    /**
+     * Tests that a message is sent successfully using the ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {
+            fut.complete(message);
+        });
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        sender.send(testMessage).join();
+
+        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+
+        assertEquals(TestMessage.class, receivedMessage.getClass());

Review comment:
       strictly speaking, this assert is redundant, since you will get a 
ClassCastException below in case of an incorrect message class

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(manager -> manager.stop());

Review comment:
       ```suggestion
           startedManagers.forEach(ConnectionManager::stop);
   ```

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(manager -> manager.stop());
+    }
+
+    /**
+     * Tests that a message is sent successfully using the ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {
+            fut.complete(message);
+        });
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        sender.send(testMessage).join();
+
+        NetworkMessage receivedMessage = fut.get(3, TimeUnit.SECONDS);
+
+        assertEquals(TestMessage.class, receivedMessage.getClass());
+        assertEquals(msgText, ((TestMessage) receivedMessage).msg());
+    }
+
+    /**
+     * Tests that the resources of a connection manager are closed after a 
shutdown.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testShutdown() throws Exception {
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        NettySender sender1 = manager1.channel(address(port2)).get();
+        NettySender sender2 = manager2.channel(address(port1)).get();
+
+        assertNotNull(sender1);
+        assertNotNull(sender2);
+
+        Stream.of(manager1, manager2).forEach(manager -> {
+            NettyServer server = manager.server();
+            Collection<NettyClient> clients = manager.clients();
+
+            manager.stop();
+
+            assertFalse(server.isRunning());
+
+            boolean clientsStopped = 
clients.stream().allMatch(NettyClient::isDisconnected);
+
+            assertTrue(clientsStopped);
+        });
+    }
+
+    /**
+     * Tests that after a channel was closed, a new channel is opened upon a 
request.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCanReconnectAfterFail() throws Exception {
+        String msgText = "test";
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        ConnectionManager manager1 = startManager(port1);
+        ConnectionManager manager2 = startManager(port2);
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        manager2.stop();
+
+        final var finalSender = sender;
+
+        assertThrows(ClosedChannelException.class, () -> {
+            try {
+                finalSender.send(testMessage).join();
+            }
+            catch (Exception e) {
+                throw e.getCause();
+            }
+        });
+
+        manager2 = startManager(port2);
+
+        var fut = new CompletableFuture<NetworkMessage>();
+
+        manager2.addListener((address, message) -> {

Review comment:
       `manager2.addListener((address, message) -> fut.complete(message));`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(ConnectionManager.class);
+
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<SocketAddress, NettySender> channels = new 
ConcurrentHashMap<>();
+
+    /** Clients. */
+    private final Map<SocketAddress, NettyClient> clients = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = 
new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param registry Serialization registry.
+     */
+    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+        this.serializationRegistry = registry;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public SocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Gets a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(SocketAddress address) {
+        NettySender channel = channels.compute(
+            address,
+            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : 
sender
+        );
+
+        if (channel != null)
+            return CompletableFuture.completedFuture(channel);
+
+        NettyClient client = clients.compute(address, (addr, existingClient) 
-> {
+            if (existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected())

Review comment:
       Can be written a little bit shorter:
   ```
   NettyClient client = clients.compute(address, (addr, existingClient) ->
       existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected() ?
           existingClient :
           connect(addr)
   );
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(ConnectionManager.class);
+
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<SocketAddress, NettySender> channels = new 
ConcurrentHashMap<>();
+
+    /** Clients. */
+    private final Map<SocketAddress, NettyClient> clients = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = 
new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param registry Serialization registry.
+     */
+    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+        this.serializationRegistry = registry;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public SocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Gets a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(SocketAddress address) {
+        NettySender channel = channels.compute(
+            address,
+            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : 
sender
+        );
+
+        if (channel != null)
+            return CompletableFuture.completedFuture(channel);
+
+        NettyClient client = clients.compute(address, (addr, existingClient) 
-> {
+            if (existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected())
+                return existingClient;
+
+            return connect(addr);
+        });
+
+        return client.sender();
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(SocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> consumer.accept(from, message));
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(NettySender channel) {
+        SocketAddress remoteAddress = channel.remoteAddress();
+        channels.put(remoteAddress, channel);
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(SocketAddress address) {
+        NettyClient client = new NettyClient(
+            address,
+            serializationRegistry
+        );
+
+        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);
+            else
+                channels.put(address, sender);
+        });
+
+        return client;
+    }
+
+    /**
+     * Add incoming message listener.
+     *
+     * @param listener Message listener.
+     */
+    public void addListener(BiConsumer<SocketAddress, NetworkMessage> 
listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Stops the server and all clients.
+     */
+    public void stop() {
+         var stream = Stream.concat(

Review comment:
       Wouldn't the following approach be shorter and more correct?
   ```
   var stream = Stream.concat(
       clients.values().stream().map(NettyClient::stop),
       Stream.of(server.stop(), clientWorkerGroup.shutdownGracefully())
   );
    
    stream.forEach(fut -> {
        try {
           fut.get();
        } catch (Exception e) {
            LOG.warn("Failed to stop the ConnectionManager: " + e.getMessage());
        }
    });
    ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyUtils.java
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Netty utilities.
+ */
+public class NettyUtils {
+    /**
+     * Convert a Netty {@link Future} to a {@link CompletableFuture}.
+     *
+     * @param nettyFuture Netty future.
+     * @param mapper Function that maps successfully resolved Netty future to 
a value for a CompletableFuture.
+     * @param <T> Resulting future type.
+     * @param <R> Netty future result type.
+     * @param <F> Netty future type.
+     * @return CompletableFuture.
+     */
+    public static <T, R, F extends Future<R>> CompletableFuture<T> 
toCompletableFuture(

Review comment:
       I think that the current approach is too generic. Looking at the actual 
use cases, I would suggest using two methods instead:
   1. One that accepts a `Future` and returns a `CompletableFuture`: `<R> 
CompletableFuture<R> toCompletableFuture(Future<R> future)`
   2. One that accepts a `ChannelFuture`: `CompletableFuture<Channel> 
toCompletableFuture(ChannelFuture future)`
   
   What do you think?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -39,7 +41,10 @@
     @Override public ClusterService 
createClusterService(ClusterLocalConfiguration context) {
         var topologyService = new ScaleCubeTopologyService();
         var messagingService = new ScaleCubeMessagingService(topologyService);
-        var transportFactory = new 
DelegatingTransportFactory(messagingService);
+        MessageSerializationRegistry registry = 
context.getSerializationRegistry();
+        ConnectionManager connectionManager = new 
ConnectionManager(context.getPort(), registry);
+        connectionManager.start();

Review comment:
       I think that the connection manager should be started inside the 
`ClusterService#start` method.

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/message/MessageSerializationRegistryTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.message;
+
+import org.apache.ignite.network.NetworkConfigurationException;
+import org.apache.ignite.network.internal.MessageReader;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * {@link MessageSerializationRegistry} tests.
+ */
+class MessageSerializationRegistryTest {
+    /**
+     * Tests that a serialization factory can be registered.
+     */
+    @Test
+    public void testRegisterFactory() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+    }
+
+    /**
+     * Tests that a serialization factory can't be registered if there is an 
already registered serialization factory
+     * with the same direct type.
+     */
+    @Test
+    public void testRegisterFactoryWithSameType() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+
+        assertThrows(NetworkConfigurationException.class, () -> {
+            registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+        });
+    }
+
+    /**
+     * Tests that a {@link MessageSerializer} and a {@link 
MessageDeserializer} can be created if a
+     * {@link MessageSerializationFactory} was registered.
+     */
+    @Test
+    public void testCreateSerializers() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+
+        assertNotNull(registry.createSerializer(Msg.TYPE));
+        assertNotNull(registry.createDeserializer(Msg.TYPE));
+    }
+
+    /**
+     * Tests that creation of a {@link MessageSerializer} or a {@link 
MessageDeserializer} fails if a
+     * {@link MessageSerializationFactory} was not registered.
+     */
+    @Test
+    public void testCreateSerializersIfNotRegistered() {
+        var registry = new MessageSerializationRegistry();
+
+        assertThrows(AssertionError.class, () -> 
registry.createSerializer(Msg.TYPE));
+        assertThrows(AssertionError.class, () -> 
registry.createDeserializer(Msg.TYPE));
+    }
+
+    /** */
+    static class Msg implements NetworkMessage {

Review comment:
       although your current approach is fine, all these test classes can be 
replaced with mocks, resulting in less code:
   ```
   private static final byte TYPE = 0;
   
   private MessageSerializationFactory<?> mockFactory() {
       return new MessageSerializationFactory<>() {
           @Override public MessageDeserializer<NetworkMessage> 
createDeserializer() {
               return mock(MessageDeserializer.class);
           }
   
           @Override public MessageSerializer<NetworkMessage> 
createSerializer() {
               return mock(MessageSerializer.class);
           }
       };
   }
   ```

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NettyServer}.
+ */
+public class NettyServerTest {
+    /**
+     * Tests a successfull server start scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSuccessfullServerStart() throws Exception {

Review comment:
       ```suggestion
       public void testSuccessfulServerStart() throws Exception {
   ```

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NettyServer}.
+ */
+public class NettyServerTest {
+    /**
+     * Tests a successfull server start scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSuccessfullServerStart() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        assertTrue(server.isRunning());
+    }
+
+    /**
+     * Tests a graceful server shutdown scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testServerGracefulShutdown() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        server.stop().join();
+
+        assertTrue(server.getBossGroup().isTerminated());
+        assertTrue(server.getWorkerGroup().isTerminated());
+    }
+
+    /**
+     * Tests a non-graceful server shutdown scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testServerChannelClosedAbruptly() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        channel.close();
+
+        assertTrue(server.getBossGroup().isShuttingDown());
+        assertTrue(server.getWorkerGroup().isShuttingDown());
+    }
+
+    /**
+     * Tests that a {@link NettyServer#start} method can be called only once.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStartTwice() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        assertThrows(IgniteInternalException.class, () -> {
+            server.start();
+        });
+    }
+
+    /**
+     * Creates a server from a backing {@link ChannelFuture}.
+     *
+     * @param future Channel future.
+     * @return NettyServer.
+     * @throws Exception If failed.
+     */
+    private NettyServer getServer(ChannelFuture future) throws Exception {

Review comment:
       ```suggestion
       private static NettyServer getServer(ChannelFuture future) throws 
Exception {
   ```

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NettyServer}.
+ */
+public class NettyServerTest {
+    /**
+     * Tests a successfull server start scenario.

Review comment:
       ```suggestion
        * Tests a successful server start scenario.
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(ConnectionManager.class);
+
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<SocketAddress, NettySender> channels = new 
ConcurrentHashMap<>();
+
+    /** Clients. */
+    private final Map<SocketAddress, NettyClient> clients = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = 
new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param registry Serialization registry.
+     */
+    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+        this.serializationRegistry = registry;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public SocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Gets a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(SocketAddress address) {
+        NettySender channel = channels.compute(
+            address,
+            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : 
sender
+        );
+
+        if (channel != null)
+            return CompletableFuture.completedFuture(channel);
+
+        NettyClient client = clients.compute(address, (addr, existingClient) 
-> {
+            if (existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected())
+                return existingClient;
+
+            return connect(addr);
+        });
+
+        return client.sender();
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.

Review comment:
       ```suggestion
        * Callback that is called upon receiving a new message.
   ```

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NettyServer}.
+ */
+public class NettyServerTest {
+    /**
+     * Tests a successfull server start scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSuccessfullServerStart() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        assertTrue(server.isRunning());
+    }
+
+    /**
+     * Tests a graceful server shutdown scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testServerGracefulShutdown() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        server.stop().join();
+
+        assertTrue(server.getBossGroup().isTerminated());
+        assertTrue(server.getWorkerGroup().isTerminated());
+    }
+
+    /**
+     * Tests a non-graceful server shutdown scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testServerChannelClosedAbruptly() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        channel.close();
+
+        assertTrue(server.getBossGroup().isShuttingDown());
+        assertTrue(server.getWorkerGroup().isShuttingDown());
+    }
+
+    /**
+     * Tests that a {@link NettyServer#start} method can be called only once.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStartTwice() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        assertThrows(IgniteInternalException.class, () -> {

Review comment:
       `assertThrows(IgniteInternalException.class, server::start);`

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/message/MessageSerializationRegistryTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.message;
+
+import org.apache.ignite.network.NetworkConfigurationException;
+import org.apache.ignite.network.internal.MessageReader;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * {@link MessageSerializationRegistry} tests.
+ */
+class MessageSerializationRegistryTest {
+    /**
+     * Tests that a serialization factory can be registered.
+     */
+    @Test
+    public void testRegisterFactory() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+    }
+
+    /**
+     * Tests that a serialization factory can't be registered if there is an 
already registered serialization factory
+     * with the same direct type.
+     */
+    @Test
+    public void testRegisterFactoryWithSameType() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+
+        assertThrows(NetworkConfigurationException.class, () -> {
+            registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+        });
+    }
+
+    /**
+     * Tests that a {@link MessageSerializer} and a {@link 
MessageDeserializer} can be created if a
+     * {@link MessageSerializationFactory} was registered.
+     */
+    @Test
+    public void testCreateSerializers() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+
+        assertNotNull(registry.createSerializer(Msg.TYPE));
+        assertNotNull(registry.createDeserializer(Msg.TYPE));
+    }
+
+    /**
+     * Tests that creation of a {@link MessageSerializer} or a {@link 
MessageDeserializer} fails if a
+     * {@link MessageSerializationFactory} was not registered.
+     */
+    @Test
+    public void testCreateSerializersIfNotRegistered() {
+        var registry = new MessageSerializationRegistry();
+
+        assertThrows(AssertionError.class, () -> 
registry.createSerializer(Msg.TYPE));
+        assertThrows(AssertionError.class, () -> 
registry.createDeserializer(Msg.TYPE));
+    }
+
+    /** */
+    static class Msg implements NetworkMessage {
+        /** */
+        static final byte TYPE = 0;
+
+        /** {@inheritDoc} */
+        @Override public short directType() {
+            return TYPE;
+        }
+    }
+
+    /** */
+    static class MsgSerializationFactory implements 
MessageSerializationFactory<Msg> {
+        /** {@inheritDoc} */
+        @Override public MessageDeserializer<Msg> createDeserializer() {
+            return new MessageDeserializer<Msg>() {
+                /** {@inheritDoc} */
+                @Override public boolean readMessage(MessageReader reader) 
throws MessageMappingException {
+                    return false;
+                }
+
+                /** {@inheritDoc} */
+                @Override public Class<Msg> klass() {
+                    return null;
+                }
+
+                /** {@inheritDoc} */
+                @Override public Msg getMessage() {
+                    return null;
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public MessageSerializer<Msg> createSerializer() {
+            return (message, writer) -> false;
+        }
+    }
+}

Review comment:
       Missing newline

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyServerTest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ServerChannel;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NettyServer}.
+ */
+public class NettyServerTest {
+    /**
+     * Tests a successfull server start scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSuccessfullServerStart() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        assertTrue(server.isRunning());
+    }
+
+    /**
+     * Tests a graceful server shutdown scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testServerGracefulShutdown() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        server.stop().join();
+
+        assertTrue(server.getBossGroup().isTerminated());
+        assertTrue(server.getWorkerGroup().isTerminated());
+    }
+
+    /**
+     * Tests a non-graceful server shutdown scenario.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testServerChannelClosedAbruptly() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        channel.close();
+
+        assertTrue(server.getBossGroup().isShuttingDown());
+        assertTrue(server.getWorkerGroup().isShuttingDown());
+    }
+
+    /**
+     * Tests that a {@link NettyServer#start} method can be called only once.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStartTwice() throws Exception {
+        var channel = new EmbeddedServerChannel();
+
+        NettyServer server = getServer(channel.newSucceededFuture());
+
+        assertThrows(IgniteInternalException.class, () -> {
+            server.start();
+        });
+    }
+
+    /**
+     * Creates a server from a backing {@link ChannelFuture}.
+     *
+     * @param future Channel future.
+     * @return NettyServer.
+     * @throws Exception If failed.
+     */
+    private NettyServer getServer(ChannelFuture future) throws Exception {

Review comment:
       you always pass `new EmbeddedServerChannel().newSucceededFuture()` as 
the parameter, so you can pass the Channel directly instead

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/AllTypesMessageGenerator.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal;
+
+import java.lang.reflect.Field;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+/**
+ * Generator for an {@link AllTypesMessage}.
+ */
+public class AllTypesMessageGenerator {
+    /**
+     * Generate a new {@link AllTypesMessage}.
+     *
+     * @param seed Random seed.
+     * @param nestedMsg {@code true} if nested messages should be generated, 
{@code false} otherwise.
+     * @return Message.
+     * @throws Exception If failed.
+     */
+    public static AllTypesMessage generate(long seed, boolean nestedMsg) {
+        try {
+            var random = new Random(seed);
+
+            var message = new AllTypesMessage();
+
+            Field[] fields = AllTypesMessage.class.getDeclaredFields();
+
+            for (Field field : fields) {
+                field.setAccessible(true);
+
+                TestFieldType annotation = 
field.getAnnotation(TestFieldType.class);
+
+                if (annotation != null) {
+                    field.set(message, randomValue(random, annotation.value(), 
nestedMsg));
+                }
+            }
+
+            if (nestedMsg) {
+                Field objectArrayField = 
AllTypesMessage.class.getDeclaredField("v");
+                objectArrayField.setAccessible(true);
+
+                Field collectionField = 
AllTypesMessage.class.getDeclaredField("w");
+                collectionField.setAccessible(true);
+
+                Field mapField = AllTypesMessage.class.getDeclaredField("x");
+                mapField.setAccessible(true);
+
+                Object[] array = IntStream.range(0, 10).mapToObj(unused -> 
generate(seed, false)).toArray();
+
+                objectArrayField.set(message, array);

Review comment:
       I think you got a little bit too excited using reflection here: 
`message.v = array;`. Same applies to all other field

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/AllTypesMessageGenerator.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal;
+
+import java.lang.reflect.Field;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.lang.IgniteUuidGenerator;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+/**
+ * Generator for an {@link AllTypesMessage}.
+ */
+public class AllTypesMessageGenerator {
+    /**
+     * Generate a new {@link AllTypesMessage}.
+     *
+     * @param seed Random seed.
+     * @param nestedMsg {@code true} if nested messages should be generated, 
{@code false} otherwise.
+     * @return Message.
+     * @throws Exception If failed.
+     */
+    public static AllTypesMessage generate(long seed, boolean nestedMsg) {
+        try {
+            var random = new Random(seed);
+
+            var message = new AllTypesMessage();
+
+            Field[] fields = AllTypesMessage.class.getDeclaredFields();
+
+            for (Field field : fields) {
+                field.setAccessible(true);
+
+                TestFieldType annotation = 
field.getAnnotation(TestFieldType.class);
+
+                if (annotation != null) {
+                    field.set(message, randomValue(random, annotation.value(), 
nestedMsg));
+                }
+            }
+
+            if (nestedMsg) {
+                Field objectArrayField = 
AllTypesMessage.class.getDeclaredField("v");
+                objectArrayField.setAccessible(true);
+
+                Field collectionField = 
AllTypesMessage.class.getDeclaredField("w");
+                collectionField.setAccessible(true);
+
+                Field mapField = AllTypesMessage.class.getDeclaredField("x");
+                mapField.setAccessible(true);
+
+                Object[] array = IntStream.range(0, 10).mapToObj(unused -> 
generate(seed, false)).toArray();
+
+                objectArrayField.set(message, array);
+
+                List<Object> collection = IntStream.range(0, 10)
+                    .mapToObj(unused -> generate(seed, false))
+                    .collect(Collectors.toList());
+
+                collectionField.set(message, collection);
+
+                Map<String, Object> map = IntStream.range(0, 10)
+                    .boxed()
+                    .collect(Collectors.toMap(String::valueOf, unused -> 
generate(seed, false)));
+
+                mapField.set(message, map);
+            }
+
+            return message;
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Generate random value.
+     *
+     * @param random Seeded random.
+     * @param type Value type.
+     * @param nestedMsg {@code true} if nested messages should be generated, 
{@code false} otherwise.
+     * @return Random value.
+     * @throws Exception If failed.
+     */
+    private static Object randomValue(Random random, MessageCollectionItemType 
type, boolean nestedMsg) throws Exception {
+        Object value = null;

Review comment:
       This method will be much shorter if you remove this variable and use 
`return` inside branches instead

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/InboundDecoderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import org.apache.ignite.network.internal.AllTypesMessage;
+import org.apache.ignite.network.internal.AllTypesMessageGenerator;
+import org.apache.ignite.network.internal.AllTypesMessageSerializationFactory;
+import org.apache.ignite.network.internal.direct.DirectMessageWriter;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Tests for {@link InboundDecoder}.
+ */
+public class InboundDecoderTest {
+    /**
+     * Tests that an {@link InboundDecoder} can successfully read a message 
with all types supported
+     * by direct marshalling.
+     *
+     * @param seed Random seed.
+     * @throws Exception If failed.
+     */
+    @ParameterizedTest
+    @MethodSource("messageGenerationSeed")
+    public void test(long seed) throws Exception {
+        var registry = new MessageSerializationRegistry();
+
+        AllTypesMessage msg = AllTypesMessageGenerator.generate(seed, true);
+
+        registry.registerFactory(msg.directType(), new 
AllTypesMessageSerializationFactory());
+
+        var channel = new EmbeddedChannel(new InboundDecoder(registry));
+
+        var writer = new DirectMessageWriter(registry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+
+        MessageSerializer<NetworkMessage> serializer = 
registry.createSerializer(msg.directType());
+
+        UnpooledByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
+
+        ByteBuffer buf = ByteBuffer.allocate(10_000);
+
+        AllTypesMessage output;
+
+        do {
+            buf.clear();
+
+            writer.setBuffer(buf);

Review comment:
       Do you need to call this method on every iteration?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty client channel wrapper.
+ */
+public class NettyClient {
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Destination host. */
+    private final String host;
+
+    /** Destination port. */
+    private final int port;
+
+    /** Future that resolves when client channel is opened. */
+    private final CompletableFuture<NettySender> clientFuture = new 
CompletableFuture<>();
+
+    /** Client channel. */
+    private SocketChannel channel;
+
+    public NettyClient(
+        String host,
+        int port,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.host = host;
+        this.port = port;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start client.
+     *
+     * @return Future that resolves when client channel is opened.
+     */
+    public CompletableFuture<NettySender> start(Bootstrap bootstrap) {
+        bootstrap.connect(host, port).addListener((ChannelFutureListener) 
connect -> {
+            this.channel = (SocketChannel) connect.channel();
+
+            if (connect.isSuccess())
+                clientFuture.complete(new NettySender(channel, 
serializationRegistry));
+            else
+                clientFuture.completeExceptionally(connect.cause());
+        });
+
+        return clientFuture;
+    }
+
+    /**
+     * @return Client start future.
+     */
+    public CompletableFuture<NettySender> sender() {
+        return clientFuture;
+    }
+
+    /**
+     * Stop client.
+     */
+    public void stop() {
+        this.channel.close().awaitUninterruptibly();
+    }
+
+    /**
+     * @return {@code true} if client failed to connect to remote server, 
{@code false} otherwise.
+     */
+    public boolean failedToConnect() {
+        return clientFuture.isCompletedExceptionally();
+    }
+
+    /**
+     * @return {@code true} if client lost connection, {@code false} otherwise.
+     */
+    public boolean isDisconnected() {
+        return channel != null && !channel.isOpen();
+    }
+
+    /**
+     * Creates {@link Bootstrap} for clients, providing channel handlers and 
options.
+     *
+     * @param eventLoopGroup Event loop group for channel handling.
+     * @param serializationRegistry Serialization registry.
+     * @param messageListener Message listener.
+     * @return Bootstrap for clients.
+     */
+    public static Bootstrap createBootstrap(

Review comment:
       What about this comment?

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+
+        startedManagers.clear();
+    }
+
+    /**
+     * Tests that a message is sent successfuly using ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        var latch = new CountDownLatch(1);
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        var manager1 = startManager(port1);
+        var manager2 = startManager(port2);
+
+        manager2.addListener((address, message) -> {
+            if (message instanceof TestMessage && 
msgText.equals(((TestMessage) message).msg()))
+                latch.countDown();
+        });
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        sender.send(testMessage).join();
+
+        latch.await(3, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Tests that after a channel was closed, a new channel is opened upon a 
request.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCanReconnectAfterFail() throws Exception {
+        String msgText = "test";
+
+        var latch = new CountDownLatch(1);
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        var manager1 = startManager(port1);
+        var manager2 = startManager(port2);
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        manager2.stop();
+
+        final var finalSender = sender;
+
+        assertThrows(ClosedChannelException.class, () -> {
+            try {
+                finalSender.send(testMessage).join();
+            }
+            catch (CompletionException e) {
+                throw e.getCause();
+            }
+        });
+
+        manager2 = startManager(port2);
+
+        manager2.addListener((address, message) -> {
+            if (message instanceof TestMessage && 
msgText.equals(((TestMessage) message).msg()))
+                latch.countDown();
+        });
+
+        sender = manager1.channel(address(port2)).get();
+
+        sender.send(testMessage).join();
+    }
+
+    /**
+     * Create an unresolved {@link InetSocketAddress} with "localhost" as a 
host.
+     *
+     * @param port Port.
+     * @return Address.
+     */
+    private InetSocketAddress address(int port) {
+        return InetSocketAddress.createUnresolved("localhost", port);

Review comment:
       will you fix this comment or should we resolve it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to