ibessonov commented on a change in pull request #102:
URL: https://github.com/apache/ignite-3/pull/102#discussion_r628104351



##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/TestMessageSerializationFactory.java
##########
@@ -84,7 +84,7 @@ public TestMessage getMessage() {
     @Override public MessageSerializer<TestMessage> createSerializer() {
         return (message, writer) -> {
             if (!writer.isHeaderWritten()) {
-                if (!writer.writeHeader(message.directType(), (byte) 1))
+                if (!writer.writeHeader(message.directType(), (byte) 2))

Review comment:
       Can this be a constant somewhere?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(ConnectionManager.class);
+
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<SocketAddress, NettySender> channels = new 
ConcurrentHashMap<>();
+
+    /** Clients. */
+    private final Map<SocketAddress, NettyClient> clients = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = 
new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param registry Serialization registry.
+     */
+    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+        this.serializationRegistry = registry;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public SocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Gets a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(SocketAddress address) {
+        NettySender channel = channels.compute(
+            address,
+            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : 
sender
+        );
+
+        if (channel != null)
+            return CompletableFuture.completedFuture(channel);
+
+        NettyClient client = clients.compute(address, (addr, existingClient) 
-> {
+            if (existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected())
+                return existingClient;
+
+            return connect(addr);
+        });
+
+        return client.sender();
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(SocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> consumer.accept(from, message));
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(NettySender channel) {
+        SocketAddress remoteAddress = channel.remoteAddress();
+        channels.put(remoteAddress, channel);
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(SocketAddress address) {
+        NettyClient client = new NettyClient(
+            address,
+            serializationRegistry
+        );
+
+        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);
+            else
+                channels.put(address, sender);
+        });
+
+        return client;
+    }
+
+    /**
+     * Add incoming message listener.
+     *
+     * @param listener Message listener.
+     */
+    public void addListener(BiConsumer<SocketAddress, NetworkMessage> 
listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Stops the server and all clients.
+     */
+    public void stop() {
+         var stream = Stream.concat(

Review comment:
       Please use explicit type

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.SocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(ConnectionManager.class);
+
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<SocketAddress, NettySender> channels = new 
ConcurrentHashMap<>();
+
+    /** Clients. */
+    private final Map<SocketAddress, NettyClient> clients = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<SocketAddress, NetworkMessage>> listeners = 
new CopyOnWriteArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param registry Serialization registry.
+     */
+    public ConnectionManager(int port, MessageSerializationRegistry registry) {
+        this.serializationRegistry = registry;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public SocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Gets a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(SocketAddress address) {
+        NettySender channel = channels.compute(
+            address,
+            (addr, sender) -> (sender == null || !sender.isOpen()) ? null : 
sender
+        );
+
+        if (channel != null)
+            return CompletableFuture.completedFuture(channel);
+
+        NettyClient client = clients.compute(address, (addr, existingClient) 
-> {
+            if (existingClient != null && !existingClient.failedToConnect() && 
!existingClient.isDisconnected())
+                return existingClient;
+
+            return connect(addr);
+        });
+
+        return client.sender();
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(SocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> consumer.accept(from, message));
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(NettySender channel) {
+        SocketAddress remoteAddress = channel.remoteAddress();
+        channels.put(remoteAddress, channel);
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyClient connect(SocketAddress address) {
+        NettyClient client = new NettyClient(
+            address,
+            serializationRegistry
+        );
+
+        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+            if (throwable != null)
+                clients.remove(address);
+            else
+                channels.put(address, sender);
+        });
+
+        return client;
+    }
+
+    /**
+     * Add incoming message listener.
+     *
+     * @param listener Message listener.
+     */
+    public void addListener(BiConsumer<SocketAddress, NetworkMessage> 
listener) {
+        listeners.add(listener);
+    }
+
+    /**
+     * Stops the server and all clients.
+     */
+    public void stop() {
+         var stream = Stream.concat(
+            clients.values().stream().map(NettyClient::stop),
+            Stream.of(server.stop())
+        );
+
+         var stopFut = 
CompletableFuture.allOf(stream.toArray(CompletableFuture<?>[]::new));

Review comment:
       Please use explicit type here as well

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeClusterServiceFactory.java
##########
@@ -39,7 +41,10 @@
     @Override public ClusterService 
createClusterService(ClusterLocalConfiguration context) {
         var topologyService = new ScaleCubeTopologyService();
         var messagingService = new ScaleCubeMessagingService(topologyService);
-        var transportFactory = new 
DelegatingTransportFactory(messagingService);
+        MessageSerializationRegistry registry = 
context.getSerializationRegistry();
+        ConnectionManager connectionManager = new 
ConnectionManager(context.getPort(), registry);

Review comment:
       Can you add some empty lines?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/message/ScaleCubeMessageSerializationFactory.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube.message;
+
+import java.util.Map;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageMappingException;
+import org.apache.ignite.network.message.MessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializer;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+/**
+ * Serialization factory for {@link ScaleCubeMessage}.
+ * TODO: IGNITE-14649 This class should be generated.
+ */
+public class ScaleCubeMessageSerializationFactory implements 
MessageSerializationFactory<ScaleCubeMessage> {
+    /** {@inheritDoc} */
+    @Override public MessageDeserializer<ScaleCubeMessage> 
createDeserializer() {
+        return new MessageDeserializer<>() {
+            /** */
+            ScaleCubeMessage obj;

Review comment:
       Not private. Why?

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettyClient;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.internal.netty.NettyServer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {

Review comment:
       a) please make it final.
   b) you can use "ConnectionManager::stop" in methods implementation.

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/message/ScaleCubeMessageSerializationFactory.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube.message;
+
+import java.util.Map;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageMappingException;
+import org.apache.ignite.network.message.MessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializer;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+/**
+ * Serialization factory for {@link ScaleCubeMessage}.
+ * TODO: IGNITE-14649 This class should be generated.
+ */
+public class ScaleCubeMessageSerializationFactory implements 
MessageSerializationFactory<ScaleCubeMessage> {
+    /** {@inheritDoc} */
+    @Override public MessageDeserializer<ScaleCubeMessage> 
createDeserializer() {
+        return new MessageDeserializer<>() {
+            /** */
+            ScaleCubeMessage obj;
+
+            /** */
+            byte[] array;
+
+            /** */
+            Map<String, String> headers;
+
+            /** {@inheritDoc} */
+            @Override public boolean readMessage(MessageReader reader) throws 
MessageMappingException {
+                if (!reader.beforeMessageRead())
+                    return false;
+
+                switch (reader.state()) {
+                    case 0:
+                        array = reader.readByteArray("array");
+
+                        if (!reader.isLastRead())
+                            return false;
+
+                        reader.incrementState();
+
+                    //noinspection fallthrough
+                    case 1:
+                        headers = reader.readMap("headers", 
MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false);
+
+                        if (!reader.isLastRead())
+                            return false;
+
+                        reader.incrementState();
+
+                }
+
+                obj = new ScaleCubeMessage(array, headers);
+
+                return reader.afterMessageRead(ScaleCubeMessage.class);
+            }
+
+            /** {@inheritDoc} */
+            @Override public Class<ScaleCubeMessage> klass() {
+                return ScaleCubeMessage.class;
+            }
+
+            /** {@inheritDoc} */
+            @Override public ScaleCubeMessage getMessage() {
+                return obj;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public MessageSerializer<ScaleCubeMessage> createSerializer() {
+        return (message, writer) -> {
+            if (!writer.isHeaderWritten()) {
+                if (!writer.writeHeader(message.directType(), (byte) 2))

Review comment:
       Again, why do we have to hardcode this constant?

##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.internal.netty.NettySender;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link ConnectionManager}.
+ */
+public class ConnectionManagerTest {
+    /** Started connection managers. */
+    private final List<ConnectionManager> startedManagers = new ArrayList<>();
+
+    /** */
+    @AfterEach
+    void tearDown() {
+        startedManagers.forEach(ConnectionManager::stop);
+
+        startedManagers.clear();
+    }
+
+    /**
+     * Tests that a message is sent successfuly using ConnectionManager.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSentSuccessfully() throws Exception {
+        String msgText = "test";
+
+        var latch = new CountDownLatch(1);
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        var manager1 = startManager(port1);
+        var manager2 = startManager(port2);
+
+        manager2.addListener((address, message) -> {
+            if (message instanceof TestMessage && 
msgText.equals(((TestMessage) message).msg()))
+                latch.countDown();
+        });
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        sender.send(testMessage).join();
+
+        latch.await(3, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Tests that after a channel was closed, a new channel is opened upon a 
request.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCanReconnectAfterFail() throws Exception {
+        String msgText = "test";
+
+        var latch = new CountDownLatch(1);
+
+        int port1 = 4000;
+        int port2 = 4001;
+
+        var manager1 = startManager(port1);
+        var manager2 = startManager(port2);
+
+        NettySender sender = manager1.channel(address(port2)).get();
+
+        TestMessage testMessage = new TestMessage(msgText, new HashMap<>());
+
+        manager2.stop();
+
+        final var finalSender = sender;

Review comment:
       Yeah, looks weird. Why "final var" instead of explicit "NettySender"?

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/AllTypesMessage.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.network.message.NetworkMessage;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+/**
+ * Message with all types supported by Direct Marshalling.
+ */
+public class AllTypesMessage implements NetworkMessage {
+    /** */
+    @TestFieldType(MessageCollectionItemType.BYTE)
+    byte a;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.SHORT)
+    short b;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.INT)
+    int c;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.LONG)
+    long d;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.FLOAT)
+    float e;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.DOUBLE)
+    double f;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.CHAR)
+    char g;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.BOOLEAN)
+    boolean h;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.BYTE_ARR)
+    byte[] i;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.SHORT_ARR)
+    short[] j;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.INT_ARR)
+    int[] k;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.LONG_ARR)
+    long[] l;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.FLOAT_ARR)
+    float[] m;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.DOUBLE_ARR)
+    double[] n;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.CHAR_ARR)
+    char[] o;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.BOOLEAN_ARR)
+    boolean[] p;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.STRING)
+    String q;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.BIT_SET)
+    BitSet r;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.UUID)
+    UUID s;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.IGNITE_UUID)
+    IgniteUuid t;
+
+    /** */
+    @TestFieldType(MessageCollectionItemType.MSG)
+    NetworkMessage u;
+
+    /** */
+    Object[] v;
+
+    /** */
+    Collection<?> w;
+
+    /** */
+    Map<?, ?> x;
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 5555;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o1) {
+        if (this == o1) return true;
+        if (o1 == null || getClass() != o1.getClass()) return false;
+        AllTypesMessage message = (AllTypesMessage) o1;
+        return a == message.a && b == message.b && c == message.c && d == 
message.d && Float.compare(message.e, e) == 0 && Double.compare(message.f, f) 
== 0 && g == message.g && h == message.h && Arrays.equals(i, message.i) && 
Arrays.equals(j, message.j) && Arrays.equals(k, message.k) && Arrays.equals(l, 
message.l) && Arrays.equals(m, message.m) && Arrays.equals(n, message.n) && 
Arrays.equals(o, message.o) && Arrays.equals(p, message.p) && Objects.equals(q, 
message.q) && Objects.equals(r, message.r) && Objects.equals(s, message.s) && 
Objects.equals(t, message.t) && Objects.equals(u, message.u) && 
Arrays.equals(v, message.v) && Objects.equals(w, message.w) && 
Objects.equals(x, message.x);

Review comment:
       Oh gods...

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/message/MessageSerializationRegistryTest.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.message;
+
+import org.apache.ignite.network.NetworkConfigurationException;
+import org.apache.ignite.network.internal.MessageReader;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * {@link MessageSerializationRegistry} tests.
+ */
+class MessageSerializationRegistryTest {
+    /**
+     * Tests that a serialization factory can be registered.
+     */
+    @Test
+    public void testRegisterFactory() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+    }
+
+    /**
+     * Tests that a serialization factory can't be registered if there is an 
already registered serialization factory
+     * with the same direct type.
+     */
+    @Test
+    public void testRegisterFactoryWithSameType() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+
+        assertThrows(NetworkConfigurationException.class, () -> {
+            registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+        });
+    }
+
+    /**
+     * Tests that a {@link MessageSerializer} and a {@link 
MessageDeserializer} can be created if a
+     * {@link MessageSerializationFactory} was registered.
+     */
+    @Test
+    public void testCreateSerializers() {
+        var registry = new MessageSerializationRegistry();
+
+        registry.registerFactory(Msg.TYPE, new MsgSerializationFactory());
+
+        assertNotNull(registry.createSerializer(Msg.TYPE));
+        assertNotNull(registry.createDeserializer(Msg.TYPE));
+    }
+
+    /**
+     * Tests that creation of a {@link MessageSerializer} or a {@link 
MessageDeserializer} fails if a
+     * {@link MessageSerializationFactory} was not registered.
+     */
+    @Test
+    public void testCreateSerializersIfNotRegistered() {
+        var registry = new MessageSerializationRegistry();
+
+        assertThrows(AssertionError.class, () -> 
registry.createSerializer(Msg.TYPE));
+        assertThrows(AssertionError.class, () -> 
registry.createDeserializer(Msg.TYPE));
+    }
+
+    /** */
+    static class Msg implements NetworkMessage {
+        /** */
+        static final byte TYPE = 0;
+
+        /** {@inheritDoc} */
+        @Override public short directType() {
+            return TYPE;
+        }
+    }
+
+    /** */
+    static class MsgSerializationFactory implements 
MessageSerializationFactory<Msg> {
+        /** {@inheritDoc} */
+        @Override public MessageDeserializer<Msg> createDeserializer() {
+            return new MessageDeserializer<Msg>() {
+                /** {@inheritDoc} */
+                @Override public boolean readMessage(MessageReader reader) 
throws MessageMappingException {
+                    return false;
+                }
+
+                /** {@inheritDoc} */
+                @Override public Class<Msg> klass() {
+                    return null;
+                }
+
+                /** {@inheritDoc} */
+                @Override public Msg getMessage() {
+                    return null;
+                }
+            };
+        }
+
+        /** {@inheritDoc} */
+        @Override public MessageSerializer<Msg> createSerializer() {
+            return (message, writer) -> false;
+        }
+    }
+}

Review comment:
       Please add empty line

##########
File path: 
modules/network/src/test/java/org/apache/ignite/network/internal/netty/NettyClientTest.java
##########
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.embedded.EmbeddedChannel;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link NettyClient}.
+ */
+class NettyClientTest {

Review comment:
       Why not public?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to