sashapolo commented on a change in pull request #102:
URL: https://github.com/apache/ignite-3/pull/102#discussion_r619043258



##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty client channel wrapper.
+ */
+public class NettyClient {
+    /** Socket channel bootstrapper. */
+    private final Bootstrap bootstrap = new Bootstrap();
+
+    /** Socket channel handler event loop group. */
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Destination host. */
+    private final String host;
+
+    /** Destination port. */
+    private final int port;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Future that resolves when client channel is opened. */
+    private final CompletableFuture<NettySender> clientFuture = new 
CompletableFuture<>();
+
+    /** Client socket channel. */
+    private Channel channel;
+
+    public NettyClient(String host, int port, MessageSerializationRegistry 
serializationRegistry, BiConsumer<InetSocketAddress, NetworkMessage> listener) {
+        this.host = host;
+        this.port = port;
+        this.serializationRegistry = serializationRegistry;
+        this.messageListener = listener;
+    }
+
+    /**
+     * Start client.
+     *
+     * @return Future that resolves when client channel is opened.
+     */
+    public CompletableFuture<NettySender> start() {
+        bootstrap.group(workerGroup)
+            .channel(NioSocketChannel.class)
+            /** See {@link NettyServer#start} for netty configuration details. 
*/
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        new InboundDecoder(serializationRegistry),
+                        new MessageHandler(messageListener),
+                        new ChunkedWriteHandler()
+                    );
+                }
+        });
+
+        ChannelFuture connectFuture = bootstrap.connect(host, port);
+
+        connectFuture.addListener(connect -> {
+            this.channel = connectFuture.channel();
+            if (connect.isSuccess())
+                clientFuture.complete(new NettySender(channel, 
serializationRegistry));
+            else {
+                Throwable cause = connect.cause();
+                clientFuture.completeExceptionally(cause);
+            }
+
+            // Shutdown event loop group when channel is closed.
+            channel.closeFuture().addListener(close -> {

Review comment:
       `channel.closeFuture().addListener(close -> 
workerGroup.shutdownGracefully());`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty client channel wrapper.
+ */
+public class NettyClient {
+    /** Socket channel bootstrapper. */
+    private final Bootstrap bootstrap = new Bootstrap();
+
+    /** Socket channel handler event loop group. */
+    private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Destination host. */
+    private final String host;
+
+    /** Destination port. */
+    private final int port;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Future that resolves when client channel is opened. */
+    private final CompletableFuture<NettySender> clientFuture = new 
CompletableFuture<>();
+
+    /** Client socket channel. */
+    private Channel channel;
+
+    public NettyClient(String host, int port, MessageSerializationRegistry 
serializationRegistry, BiConsumer<InetSocketAddress, NetworkMessage> listener) {
+        this.host = host;
+        this.port = port;
+        this.serializationRegistry = serializationRegistry;
+        this.messageListener = listener;
+    }
+
+    /**
+     * Start client.
+     *
+     * @return Future that resolves when client channel is opened.
+     */
+    public CompletableFuture<NettySender> start() {
+        bootstrap.group(workerGroup)
+            .channel(NioSocketChannel.class)
+            /** See {@link NettyServer#start} for netty configuration details. 
*/
+            .option(ChannelOption.SO_KEEPALIVE, true)
+            .handler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        new InboundDecoder(serializationRegistry),
+                        new MessageHandler(messageListener),
+                        new ChunkedWriteHandler()
+                    );
+                }
+        });
+
+        ChannelFuture connectFuture = bootstrap.connect(host, port);
+
+        connectFuture.addListener(connect -> {
+            this.channel = connectFuture.channel();
+            if (connect.isSuccess())
+                clientFuture.complete(new NettySender(channel, 
serializationRegistry));
+            else {
+                Throwable cause = connect.cause();
+                clientFuture.completeExceptionally(cause);
+            }
+
+            // Shutdown event loop group when channel is closed.
+            channel.closeFuture().addListener(close -> {
+               workerGroup.shutdownGracefully();
+            });
+        });
+
+        return clientFuture;
+    }
+
+    /**
+     * @return Client start future.
+     */
+    public CompletableFuture<NettySender> sender() {
+        return clientFuture;
+    }
+
+    /**
+     * Stop client.
+     */
+    public void stop() {
+        this.channel.close().awaitUninterruptibly();

Review comment:
       `await` will not rethrow any exceptions, should we use `sync` instead?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ *
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY);
+        MessageReader reader = readerAttr.get();
+
+        if (reader == null)
+            readerAttr.set(reader = new 
DirectMessageReader(serializationRegistry, (byte) 1));
+
+        Attribute<MessageDeserializer<NetworkMessage>> messageAttr = 
ctx.channel().attr(DESERIALIZER_KEY);
+
+        while (buffer.hasRemaining()) {
+            MessageDeserializer<NetworkMessage> msg = messageAttr.get();
+
+            try {
+                // Read message type.
+                if (msg == null && buffer.remaining() >= 
NetworkMessage.DIRECT_TYPE_SIZE) {
+                    byte b0 = buffer.get();
+                    byte b1 = buffer.get();
+
+                    msg = 
serializationRegistry.createDeserializer(makeMessageType(b0, b1));
+                }
+
+                boolean finished = false;
+
+                // Read message if buffer has remaining data.
+                if (msg != null && buffer.hasRemaining()) {
+                    reader.setCurrentReadClass(msg.klass());
+                    reader.setBuffer(buffer);
+
+                    finished = msg.readMessage(reader);
+                }
+
+                // Set read position to Netty's ByteBuf.
+                in.readerIndex(buffer.position());
+
+                if (finished) {
+                    reader.reset();
+                    messageAttr.set(null);
+
+                    out.add(msg.getMessage());
+                }
+                else
+                    messageAttr.set(msg);
+            }
+            catch (Throwable e) {
+                System.err.println("Failed to read message [msg=" + msg +

Review comment:
       I think using a log would be more appropriate here

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start server.
+     *
+     * @return Future that resolves when server is successfuly started.
+     */
+    public CompletableFuture<Void> start() {
+        bootstrap.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        /**
+                         * Decoder that uses {@link 
org.apache.ignite.network.internal.MessageReader}
+                         *  to read chunked data.
+                         */
+                        new InboundDecoder(serializationRegistry),
+                        /** Handles decoded {@link NetworkMessage}s. */
+                        new MessageHandler(messageListener),
+                        /**
+                         * Encoder that uses {@link 
org.apache.ignite.network.internal.MessageWriter}
+                         * to write chunked data.
+                         */
+                        new ChunkedWriteHandler()
+                    );
+
+                    newConnectionListener.accept(ch);
+                }
+            })
+            /**
+             * The maximum queue length for incoming connection indications (a 
request to connect) is set

Review comment:
       Nice comments!

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedInput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.network.internal.direct.DirectMessageWriter;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Wrapper for channel, that uses {@link ChunkedInput} to send data.
+ */
+public class NettySender {
+    /** Netty channel. */
+    private final Channel channel;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param channel Netty channel.
+     * @param registry Serialization registry.
+     */
+    public NettySender(Channel channel, MessageSerializationRegistry registry) 
{
+        this.channel = channel;
+        serializationRegistry = registry;
+    }
+
+    /**
+     * Send message.
+     *
+     * @param msg Network message.
+     */
+    public void send(NetworkMessage msg) {
+        MessageSerializer<NetworkMessage> serializer = 
serializationRegistry.createSerializer(msg.directType());
+        channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer));
+    }
+
+    /**
+     * Chunked input for network message.
+     */
+    private static class NetworkMessageChunkedInput implements 
ChunkedInput<ByteBuf> {
+        /** Network message. */
+        private final NetworkMessage msg;
+
+        /** Message serializer. */
+        private final MessageSerializer<NetworkMessage> serializer;
+
+        /** Message writer. */
+        private final DirectMessageWriter writer = new 
DirectMessageWriter((byte) 1);
+
+        /** Whether the message was fully written. */
+        boolean finished = false;
+
+        /**
+         * Constructor.
+         *
+         * @param msg Network message.
+         * @param serializer Serializer.
+         */
+        private NetworkMessageChunkedInput(NetworkMessage msg, 
MessageSerializer<NetworkMessage> serializer) {
+            this.msg = msg;
+            this.serializer = serializer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isEndOfInput() throws Exception {
+            return finished;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+
+        }
+
+
+        /** {@inheritDoc} */
+        @Deprecated
+        @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws 
Exception {
+            return readChunk(ctx.alloc());
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuf readChunk(ByteBufAllocator allocator) throws 
Exception {
+            ByteBuf buffer = allocator.buffer(4096);
+            final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);

Review comment:
       Are you sure this should be a direct byte buffer? I can see that it is 
only used as an intermediate storage between `writer` and `buffer`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOGGER.info("[{}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOGGER.info("[{}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return Mono.defer(() -> {
+            stop.onComplete();
+            return onStop;
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        return Mono.defer(() -> {
+            return 
Mono.fromFuture(connectionManager.channel(InetSocketAddress.createUnresolved(address.host(),
 address.port())));
+        }).flatMap(client -> {
+            client.send(fromMessage(message));
+            return Mono.empty().then();
+        });
+    }
+
+    /**
+     * Handles new network messages from {@link #connectionManager}.
+     *
+     * @param source Message source.
+     * @param msg Network message.
+     */
+    private void onMessage(InetSocketAddress source, NetworkMessage msg) {
+        Message message = fromNetworkMessage(msg);
+
+        if (message != null)
+            sink.next(message);
+    }
+
+    /**
+     * Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
+     *
+     * @param message ScaleCube message.
+     * @return Netowork message that wraps ScaleCube message.
+     * @throws IgniteInternalException If failed to write message to 
ObjectOutputStream.
+     */
+    private NetworkMessage fromMessage(Message message) throws 
IgniteInternalException {
+        Object dataObj = message.data();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        ObjectOutputStream o;
+
+        try {

Review comment:
       Same as below about closing the streams

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start server.
+     *
+     * @return Future that resolves when server is successfuly started.
+     */
+    public CompletableFuture<Void> start() {
+        bootstrap.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        /**
+                         * Decoder that uses {@link 
org.apache.ignite.network.internal.MessageReader}
+                         *  to read chunked data.
+                         */
+                        new InboundDecoder(serializationRegistry),
+                        /** Handles decoded {@link NetworkMessage}s. */
+                        new MessageHandler(messageListener),
+                        /**
+                         * Encoder that uses {@link 
org.apache.ignite.network.internal.MessageWriter}
+                         * to write chunked data.
+                         */
+                        new ChunkedWriteHandler()
+                    );
+
+                    newConnectionListener.accept(ch);
+                }
+            })
+            /**
+             * The maximum queue length for incoming connection indications (a 
request to connect) is set
+             * to the backlog parameter. If a connection indication arrives 
when the queue is full,
+             * the connection is refused.
+             */
+            .option(ChannelOption.SO_BACKLOG, 128)
+            /**
+             * When the keepalive option is set for a TCP socket and no data 
has been exchanged across the socket
+             * in either direction for 2 hours (NOTE: the actual value is 
implementation dependent),
+             * TCP automatically sends a keepalive probe to the peer.
+             */
+            .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        CompletableFuture<Void> serverStartFuture = new CompletableFuture<>();
+
+        ChannelFuture bindFuture = bootstrap.bind(port);
+
+        bindFuture.addListener(bind -> {

Review comment:
       same stuff as in `NettyClient`:
   ```
   bootstrap.bind(port)
       .addListener((ChannelFutureListener) bindFuture -> {
           this.channel = (ServerSocketChannel) bindFuture.channel();
           ....
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ *
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY);
+        MessageReader reader = readerAttr.get();
+
+        if (reader == null)
+            readerAttr.set(reader = new 
DirectMessageReader(serializationRegistry, (byte) 1));

Review comment:
       I'm personally not very fond of such style of assignments. Or is it 
allowed be the code style?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start server.
+     *
+     * @return Future that resolves when server is successfuly started.
+     */
+    public CompletableFuture<Void> start() {
+        bootstrap.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        /**
+                         * Decoder that uses {@link 
org.apache.ignite.network.internal.MessageReader}
+                         *  to read chunked data.
+                         */
+                        new InboundDecoder(serializationRegistry),
+                        /** Handles decoded {@link NetworkMessage}s. */
+                        new MessageHandler(messageListener),
+                        /**
+                         * Encoder that uses {@link 
org.apache.ignite.network.internal.MessageWriter}
+                         * to write chunked data.
+                         */
+                        new ChunkedWriteHandler()
+                    );
+
+                    newConnectionListener.accept(ch);
+                }
+            })
+            /**
+             * The maximum queue length for incoming connection indications (a 
request to connect) is set
+             * to the backlog parameter. If a connection indication arrives 
when the queue is full,
+             * the connection is refused.
+             */
+            .option(ChannelOption.SO_BACKLOG, 128)
+            /**
+             * When the keepalive option is set for a TCP socket and no data 
has been exchanged across the socket
+             * in either direction for 2 hours (NOTE: the actual value is 
implementation dependent),
+             * TCP automatically sends a keepalive probe to the peer.
+             */
+            .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        CompletableFuture<Void> serverStartFuture = new CompletableFuture<>();
+
+        ChannelFuture bindFuture = bootstrap.bind(port);
+
+        bindFuture.addListener(bind -> {
+            this.channel = (ServerSocketChannel) bindFuture.channel();
+
+            if (bind.isSuccess())
+                serverStartFuture.complete(null);
+            else {
+                Throwable cause = bind.cause();
+                serverStartFuture.completeExceptionally(cause);
+            }
+
+            // Shutdown event loops on server stop.
+            channel.closeFuture().addListener(close -> {
+                workerGroup.shutdownGracefully();
+                bossGroup.shutdownGracefully();
+            });
+        });
+
+        return serverStartFuture;
+    }
+
+    /**
+     * @return Gets server address.

Review comment:
       ```suggestion
        * @return Gets the server address.
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start server.
+     *
+     * @return Future that resolves when server is successfuly started.
+     */
+    public CompletableFuture<Void> start() {
+        bootstrap.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        /**
+                         * Decoder that uses {@link 
org.apache.ignite.network.internal.MessageReader}
+                         *  to read chunked data.
+                         */
+                        new InboundDecoder(serializationRegistry),
+                        /** Handles decoded {@link NetworkMessage}s. */
+                        new MessageHandler(messageListener),
+                        /**
+                         * Encoder that uses {@link 
org.apache.ignite.network.internal.MessageWriter}
+                         * to write chunked data.
+                         */
+                        new ChunkedWriteHandler()
+                    );
+
+                    newConnectionListener.accept(ch);
+                }
+            })
+            /**
+             * The maximum queue length for incoming connection indications (a 
request to connect) is set
+             * to the backlog parameter. If a connection indication arrives 
when the queue is full,
+             * the connection is refused.
+             */
+            .option(ChannelOption.SO_BACKLOG, 128)
+            /**
+             * When the keepalive option is set for a TCP socket and no data 
has been exchanged across the socket
+             * in either direction for 2 hours (NOTE: the actual value is 
implementation dependent),
+             * TCP automatically sends a keepalive probe to the peer.
+             */
+            .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        CompletableFuture<Void> serverStartFuture = new CompletableFuture<>();
+
+        ChannelFuture bindFuture = bootstrap.bind(port);
+
+        bindFuture.addListener(bind -> {
+            this.channel = (ServerSocketChannel) bindFuture.channel();
+
+            if (bind.isSuccess())
+                serverStartFuture.complete(null);
+            else {
+                Throwable cause = bind.cause();
+                serverStartFuture.completeExceptionally(cause);
+            }
+
+            // Shutdown event loops on server stop.
+            channel.closeFuture().addListener(close -> {
+                workerGroup.shutdownGracefully();

Review comment:
       `shutdownGracefully` returns a future, should we wait for it to complete?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ *
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY);
+        MessageReader reader = readerAttr.get();
+
+        if (reader == null)
+            readerAttr.set(reader = new 
DirectMessageReader(serializationRegistry, (byte) 1));

Review comment:
       I think it would be nice to extract `(byte) 1` into a constant

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedInput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.network.internal.direct.DirectMessageWriter;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Wrapper for channel, that uses {@link ChunkedInput} to send data.
+ */
+public class NettySender {
+    /** Netty channel. */
+    private final Channel channel;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param channel Netty channel.
+     * @param registry Serialization registry.
+     */
+    public NettySender(Channel channel, MessageSerializationRegistry registry) 
{
+        this.channel = channel;
+        serializationRegistry = registry;
+    }
+
+    /**
+     * Send message.
+     *
+     * @param msg Network message.
+     */
+    public void send(NetworkMessage msg) {
+        MessageSerializer<NetworkMessage> serializer = 
serializationRegistry.createSerializer(msg.directType());
+        channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer));
+    }
+
+    /**
+     * Chunked input for network message.
+     */
+    private static class NetworkMessageChunkedInput implements 
ChunkedInput<ByteBuf> {
+        /** Network message. */
+        private final NetworkMessage msg;
+
+        /** Message serializer. */
+        private final MessageSerializer<NetworkMessage> serializer;
+
+        /** Message writer. */
+        private final DirectMessageWriter writer = new 
DirectMessageWriter((byte) 1);
+
+        /** Whether the message was fully written. */
+        boolean finished = false;
+
+        /**
+         * Constructor.
+         *
+         * @param msg Network message.
+         * @param serializer Serializer.
+         */
+        private NetworkMessageChunkedInput(NetworkMessage msg, 
MessageSerializer<NetworkMessage> serializer) {
+            this.msg = msg;
+            this.serializer = serializer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isEndOfInput() throws Exception {
+            return finished;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+
+        }
+
+
+        /** {@inheritDoc} */
+        @Deprecated
+        @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws 
Exception {
+            return readChunk(ctx.alloc());
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuf readChunk(ByteBufAllocator allocator) throws 
Exception {
+            ByteBuf buffer = allocator.buffer(4096);
+            final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);

Review comment:
       ```suggestion
               ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start server.

Review comment:
       ```suggestion
        * Starts the server.
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedInput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.network.internal.direct.DirectMessageWriter;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Wrapper for channel, that uses {@link ChunkedInput} to send data.
+ */
+public class NettySender {
+    /** Netty channel. */
+    private final Channel channel;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param channel Netty channel.
+     * @param registry Serialization registry.
+     */
+    public NettySender(Channel channel, MessageSerializationRegistry registry) 
{
+        this.channel = channel;
+        serializationRegistry = registry;
+    }
+
+    /**
+     * Send message.
+     *
+     * @param msg Network message.
+     */
+    public void send(NetworkMessage msg) {
+        MessageSerializer<NetworkMessage> serializer = 
serializationRegistry.createSerializer(msg.directType());
+        channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer));
+    }
+
+    /**
+     * Chunked input for network message.
+     */
+    private static class NetworkMessageChunkedInput implements 
ChunkedInput<ByteBuf> {
+        /** Network message. */
+        private final NetworkMessage msg;
+
+        /** Message serializer. */
+        private final MessageSerializer<NetworkMessage> serializer;
+
+        /** Message writer. */
+        private final DirectMessageWriter writer = new 
DirectMessageWriter((byte) 1);

Review comment:
       I think `(byte) 1` should be extracted into a constant

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start server.
+     *
+     * @return Future that resolves when server is successfuly started.

Review comment:
       ```suggestion
        * @return Future that gets resolved when the server is successfully 
started.
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ *
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY);

Review comment:
       BTW, `ctx` itself is already an `AttributeMap`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/message/ScaleCubeMessage.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube.message;
+
+import java.util.Map;
+import org.apache.ignite.network.message.NetworkMessage;
+
+public class ScaleCubeMessage implements NetworkMessage {

Review comment:
       There are no javadocs in this class

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/message/ScaleCubeMessageSerializationFactory.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube.message;
+
+import java.util.Map;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageMappingException;
+import org.apache.ignite.network.message.MessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializer;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+public class ScaleCubeMessageSerializationFactory implements 
MessageSerializationFactory<ScaleCubeMessage> {
+    /** {@inheritDoc} */
+    @Override public MessageDeserializer<ScaleCubeMessage> 
createDeserializer() {
+        return new MessageDeserializer<ScaleCubeMessage>() {

Review comment:
       ```suggestion
           return new MessageDeserializer<>() {
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start server.
+     *
+     * @return Future that resolves when server is successfuly started.
+     */
+    public CompletableFuture<Void> start() {
+        bootstrap.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch)
+                    throws Exception {
+                    ch.pipeline().addLast(
+                        /**
+                         * Decoder that uses {@link 
org.apache.ignite.network.internal.MessageReader}
+                         *  to read chunked data.
+                         */
+                        new InboundDecoder(serializationRegistry),
+                        /** Handles decoded {@link NetworkMessage}s. */
+                        new MessageHandler(messageListener),
+                        /**
+                         * Encoder that uses {@link 
org.apache.ignite.network.internal.MessageWriter}
+                         * to write chunked data.
+                         */
+                        new ChunkedWriteHandler()
+                    );
+
+                    newConnectionListener.accept(ch);
+                }
+            })
+            /**
+             * The maximum queue length for incoming connection indications (a 
request to connect) is set
+             * to the backlog parameter. If a connection indication arrives 
when the queue is full,
+             * the connection is refused.
+             */
+            .option(ChannelOption.SO_BACKLOG, 128)
+            /**
+             * When the keepalive option is set for a TCP socket and no data 
has been exchanged across the socket
+             * in either direction for 2 hours (NOTE: the actual value is 
implementation dependent),
+             * TCP automatically sends a keepalive probe to the peer.
+             */
+            .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+        CompletableFuture<Void> serverStartFuture = new CompletableFuture<>();
+
+        ChannelFuture bindFuture = bootstrap.bind(port);
+
+        bindFuture.addListener(bind -> {
+            this.channel = (ServerSocketChannel) bindFuture.channel();
+
+            if (bind.isSuccess())
+                serverStartFuture.complete(null);
+            else {
+                Throwable cause = bind.cause();
+                serverStartFuture.completeExceptionally(cause);
+            }
+
+            // Shutdown event loops on server stop.
+            channel.closeFuture().addListener(close -> {
+                workerGroup.shutdownGracefully();
+                bossGroup.shutdownGracefully();
+            });
+        });
+
+        return serverStartFuture;
+    }
+
+    /**
+     * @return Gets server address.
+     */
+    public InetSocketAddress address() {
+        return channel.localAddress();
+    }
+
+    /**
+     * Stop server.

Review comment:
       ```suggestion
        * Stops the server.
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())

Review comment:
       Looks like this check is not needed, localhost patterns are handled by 
the `Address.from`

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/message/ScaleCubeMessageSerializationFactory.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube.message;
+
+import java.util.Map;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageMappingException;
+import org.apache.ignite.network.message.MessageSerializationFactory;
+import org.apache.ignite.network.message.MessageSerializer;
+import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+
+public class ScaleCubeMessageSerializationFactory implements 
MessageSerializationFactory<ScaleCubeMessage> {
+    /** {@inheritDoc} */
+    @Override public MessageDeserializer<ScaleCubeMessage> 
createDeserializer() {
+        return new MessageDeserializer<ScaleCubeMessage>() {
+
+            ScaleCubeMessage obj;
+
+            byte[] array;
+
+            Map<String, String> headers;
+
+            /** {@inheritDoc} */
+            @Override public boolean readMessage(MessageReader reader) throws 
MessageMappingException {
+                if (!reader.beforeMessageRead())
+                    return false;
+
+                switch (reader.state()) {
+                    case 0:
+                        array = reader.readByteArray("array");
+
+                        if (!reader.isLastRead())
+                            return false;
+
+                        reader.incrementState();
+
+                    //noinspection fallthrough
+                    case 1:
+                        headers = reader.readMap("headers", 
MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false);

Review comment:
       I don't understand, shouldn't we read the headers first?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ *
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY);
+        MessageReader reader = readerAttr.get();
+
+        if (reader == null)
+            readerAttr.set(reader = new 
DirectMessageReader(serializationRegistry, (byte) 1));
+
+        Attribute<MessageDeserializer<NetworkMessage>> messageAttr = 
ctx.channel().attr(DESERIALIZER_KEY);
+
+        while (buffer.hasRemaining()) {
+            MessageDeserializer<NetworkMessage> msg = messageAttr.get();
+
+            try {
+                // Read message type.
+                if (msg == null && buffer.remaining() >= 
NetworkMessage.DIRECT_TYPE_SIZE) {
+                    byte b0 = buffer.get();
+                    byte b1 = buffer.get();
+
+                    msg = 
serializationRegistry.createDeserializer(makeMessageType(b0, b1));
+                }
+
+                boolean finished = false;
+
+                // Read message if buffer has remaining data.
+                if (msg != null && buffer.hasRemaining()) {
+                    reader.setCurrentReadClass(msg.klass());
+                    reader.setBuffer(buffer);
+
+                    finished = msg.readMessage(reader);
+                }
+
+                // Set read position to Netty's ByteBuf.
+                in.readerIndex(buffer.position());
+
+                if (finished) {
+                    reader.reset();
+                    messageAttr.set(null);
+
+                    out.add(msg.getMessage());
+                }
+                else
+                    messageAttr.set(msg);
+            }
+            catch (Throwable e) {
+                System.err.println("Failed to read message [msg=" + msg +
+                    ", buf=" + buffer +
+                    ", reader=" + reader + "]: " + e.getMessage());
+
+                throw e;
+            }
+        }
+    }
+
+    /**
+     * Concatenates the two parameter bytes to form a message type value.
+     *
+     * @param b0 The first byte.
+     * @param b1 The second byte.
+     */
+    public static short makeMessageType(byte b0, byte b1) {

Review comment:
       ```suggestion
       private static short makeMessageType(byte b0, byte b1) {
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Transport.class);

Review comment:
       should we use the `IgniteLogger` here?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOGGER.info("[{}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOGGER.info("[{}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return Mono.defer(() -> {
+            stop.onComplete();
+            return onStop;
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        return Mono.defer(() -> {

Review comment:
       I would suggest reformatting this part, as it looks quite ugly to me, 
for example:
   ```
   var inetAddr = InetSocketAddress.createUnresolved(address.host(), 
address.port());
   return Mono.defer(() -> Mono.fromFuture(connectionManager.channel(inetAddr)))
       .map(client -> {
           client.send(fromMessage(message));
           return null;
       });
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOGGER.info("[{}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOGGER.info("[{}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return Mono.defer(() -> {
+            stop.onComplete();
+            return onStop;
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        return Mono.defer(() -> {
+            return 
Mono.fromFuture(connectionManager.channel(InetSocketAddress.createUnresolved(address.host(),
 address.port())));
+        }).flatMap(client -> {
+            client.send(fromMessage(message));
+            return Mono.empty().then();
+        });
+    }
+
+    /**
+     * Handles new network messages from {@link #connectionManager}.
+     *
+     * @param source Message source.
+     * @param msg Network message.
+     */
+    private void onMessage(InetSocketAddress source, NetworkMessage msg) {
+        Message message = fromNetworkMessage(msg);
+
+        if (message != null)
+            sink.next(message);
+    }
+
+    /**
+     * Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
+     *
+     * @param message ScaleCube message.
+     * @return Netowork message that wraps ScaleCube message.
+     * @throws IgniteInternalException If failed to write message to 
ObjectOutputStream.
+     */
+    private NetworkMessage fromMessage(Message message) throws 
IgniteInternalException {
+        Object dataObj = message.data();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        ObjectOutputStream o;
+
+        try {
+            o = new ObjectOutputStream(stream);
+            o.writeObject(dataObj);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        return new ScaleCubeMessage(stream.toByteArray(), message.headers());
+    }
+
+    /**
+     * Unwrap ScaleCube {@link Message} from {@link NetworkMessage}.
+     *
+     * @param networkMessage Network message.
+     * @return ScaleCube message.
+     * @throws IgniteInternalException If failed to read ScaleCube message 
byte array.
+     */
+    private Message fromNetworkMessage(NetworkMessage networkMessage) throws 
IgniteInternalException {
+        if (networkMessage instanceof ScaleCubeMessage) {
+            ScaleCubeMessage msg = (ScaleCubeMessage) networkMessage;
+
+            Map<String, String> headers = msg.getHeaders();
+
+            Object obj;
+
+            try {
+                obj = new ObjectInputStream(new 
ByteArrayInputStream(msg.getArray())).readObject();

Review comment:
       I understand that it is not strictly necessary here, but let's handle 
resources properly:
   ```
   try (var in = new ObjectInputStream(new 
ByteArrayInputStream(msg.getArray()))) {
       obj = in.readObject();
   }
   ```

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOGGER.info("[{}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOGGER.info("[{}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return Mono.defer(() -> {
+            stop.onComplete();
+            return onStop;
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        return Mono.defer(() -> {
+            return 
Mono.fromFuture(connectionManager.channel(InetSocketAddress.createUnresolved(address.host(),
 address.port())));
+        }).flatMap(client -> {
+            client.send(fromMessage(message));
+            return Mono.empty().then();

Review comment:
       why do you need `then` here? I also think that `flatMap` is redundant, 
see my suggestion above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to