ibessonov commented on a change in pull request #102:
URL: https://github.com/apache/ignite-3/pull/102#discussion_r622044389



##########
File path: 
modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/ITScaleCubeNetworkMessagingTest.java
##########
@@ -179,11 +183,14 @@ private static void stopForcefully(ClusterService 
cluster) throws Exception {
      * Wrapper for cluster.
      */
     private static final class Cluster {
-        /** */
-        private static final MessageSerializationRegistry 
SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
         /** */
         private static final ClusterServiceFactory NETWORK_FACTORY = new 
ScaleCubeClusterServiceFactory();
 
+        /** */
+        private static final MessageSerializationRegistry 
SERIALIZATION_REGISTRY = new MessageSerializationRegistry()

Review comment:
       Please don't make these things static even in tests

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Class that manages connections both incoming and outgoing.
+ */
+public class ConnectionManager {
+    /** Latest version of the direct marshalling protocol. */
+    public static final byte DIRECT_PROTOCOL_VERSION = 1;
+
+    /** Client bootstrap. */
+    private final Bootstrap clientBootstrap;
+
+    /** Client socket channel handler event loop group. */
+    private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup();
+
+    /** Server. */
+    private final NettyServer server;
+
+    /** Channels. */
+    private final Map<InetSocketAddress, NettyChannel> channels = new 
ConcurrentHashMap<>();
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Message listeners. */
+    private final List<BiConsumer<InetSocketAddress, NetworkMessage>> 
listeners = new CopyOnWriteArrayList<>(new ArrayList<>());
+
+    private Lock lock = new ReentrantLock();
+
+    public ConnectionManager(int port, MessageSerializationRegistry provider) {
+        this.serializationRegistry = provider;
+        this.server = new NettyServer(port, this::onNewIncomingChannel, 
this::onMessage, serializationRegistry);
+        this.clientBootstrap = NettyClient.setupBootstrap(clientWorkerGroup, 
serializationRegistry, this::onMessage);
+    }
+
+    /**
+     * Start server.
+     *
+     * @throws IgniteInternalException If failed to start.
+     */
+    public void start() throws IgniteInternalException {
+        try {
+            server.start().join();
+        }
+        catch (CompletionException e) {
+            Throwable cause = e.getCause();
+            throw new IgniteInternalException("Failed to start server: " + 
cause.getMessage(), cause);
+        }
+    }
+
+    /**
+     * @return Server local address.
+     */
+    public InetSocketAddress getLocalAddress() {
+        return server.address();
+    }
+
+    /**
+     * Get a {@link NettySender}, that sends data from this node to another 
node with the specified address.
+     * @param address Another node's address.
+     * @return Sender.
+     */
+    public CompletableFuture<NettySender> channel(InetSocketAddress address) {
+        NettyChannel channel = channels.computeIfAbsent(address, 
this::connect);
+
+        return channel.channel();
+    }
+
+    /**
+     * Callback that is called upon receiving of a new message.
+     *
+     * @param from Source of the message.
+     * @param message New message.
+     */
+    private void onMessage(InetSocketAddress from, NetworkMessage message) {
+        listeners.forEach(consumer -> consumer.accept(from, message));
+    }
+
+    /**
+     * Callback that is called upon new client connected to the server.
+     *
+     * @param channel Channel from client to this {@link #server}.
+     */
+    private void onNewIncomingChannel(SocketChannel channel) {
+        InetSocketAddress remoteAddress = channel.remoteAddress();
+
+        lock.lock();
+
+        try {
+            NettyChannel existingChannel = channels.get(remoteAddress);
+
+            if (existingChannel != null && existingChannel.isOpen())
+                channel.close();
+            else {
+                NettyChannel serverChannel = NettyChannel.fromServer(new 
NettySender(channel, serializationRegistry));
+                channels.put(remoteAddress, serverChannel);
+
+                if (existingChannel != null)
+                    existingChannel.close();
+            }
+        }
+        finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Create new client from this node to specified address.
+     *
+     * @param address Target address.
+     * @return New netty client.
+     */
+    private NettyChannel connect(InetSocketAddress address) {
+        CompletableFuture<NettySender> fut = new CompletableFuture<>();
+
+        connect0(fut, address, 3);
+
+        return NettyChannel.fromFuture(fut);
+    }
+
+    private void connect0(CompletableFuture<NettySender> fut, 
InetSocketAddress address, int retryCount) {
+        NettyClient client = new NettyClient(
+            address.getHostName(),
+            address.getPort(),
+            serializationRegistry
+        );
+
+        client.start(clientBootstrap).whenComplete((sender, throwable) -> {
+            lock.lock();
+
+            try {
+                if (throwable != null) {
+                    NettyChannel existingChannel = channels.get(address);
+
+                    if (existingChannel != null && existingChannel.isOpen()) {
+                        try {
+                            NettySender sender1 = 
existingChannel.channel().get();
+                            fut.complete(sender1);
+                        }
+                        catch (Exception e) {
+                            e.printStackTrace();

Review comment:
       Please fix this place

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedInput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.network.internal.direct.DirectMessageWriter;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Wrapper for a Netty {@link Channel}, that uses {@link ChunkedInput} and 
{@link DirectMessageWriter} to send data.
+ */
+public class NettySender {
+    /** Netty channel. */
+    private final Channel channel;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param channel Netty channel.
+     * @param registry Serialization registry.
+     */
+    public NettySender(Channel channel, MessageSerializationRegistry registry) 
{
+        this.channel = channel;
+        serializationRegistry = registry;
+    }
+
+    /**
+     * Send message.
+     *
+     * @param msg Network message.
+     */
+    public void send(NetworkMessage msg) {
+        MessageSerializer<NetworkMessage> serializer = 
serializationRegistry.createSerializer(msg.directType());
+        channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer));

Review comment:
       hm, should we flush every message? What other options do we have?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", 
address, ex.toString())

Review comment:
       Looks like we can update message format, do we?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Decodes {@link ByteBuf}s into {@link NetworkMessage}s.
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(InboundDecoder.class);
+
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.attr(READER_KEY);
+        MessageReader reader = readerAttr.get();
+
+        if (reader == null) {
+            reader = new DirectMessageReader(serializationRegistry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            readerAttr.set(reader);
+        }
+
+        Attribute<MessageDeserializer<NetworkMessage>> messageAttr = 
ctx.attr(DESERIALIZER_KEY);
+
+        while (buffer.hasRemaining()) {
+            MessageDeserializer<NetworkMessage> msg = messageAttr.get();
+
+            try {
+                // Read message type.
+                if (msg == null && buffer.remaining() >= 
NetworkMessage.DIRECT_TYPE_SIZE) {
+                    byte b0 = buffer.get();

Review comment:
       Short type can be compressed for first 128 values, which is enough for 
most cases. Anyway, **short type = SomeUtil.readType(buffer)**  would look 
better.

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @return Future that resolves when the server is successfully started.
+     */
+    public CompletableFuture<Void> start() {
+        bootstrap.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch) {
+                    ch.pipeline().addLast(
+                        /**
+                         * Decoder that uses {@link 
org.apache.ignite.network.internal.MessageReader}
+                         *  to read chunked data.

Review comment:
       formatting

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedInput;
+import java.nio.ByteBuffer;
+import org.apache.ignite.network.internal.direct.DirectMessageWriter;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.MessageSerializer;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Wrapper for a Netty {@link Channel}, that uses {@link ChunkedInput} and 
{@link DirectMessageWriter} to send data.
+ */
+public class NettySender {
+    /** Netty channel. */
+    private final Channel channel;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param channel Netty channel.
+     * @param registry Serialization registry.
+     */
+    public NettySender(Channel channel, MessageSerializationRegistry registry) 
{
+        this.channel = channel;
+        serializationRegistry = registry;
+    }
+
+    /**
+     * Send message.
+     *
+     * @param msg Network message.
+     */
+    public void send(NetworkMessage msg) {
+        MessageSerializer<NetworkMessage> serializer = 
serializationRegistry.createSerializer(msg.directType());
+        channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer));
+    }
+
+    /**
+     * Close channel.
+     */
+    public void close() {
+        this.channel.close().awaitUninterruptibly();
+    }
+
+    /**
+     * Chunked input for network message.
+     */
+    private static class NetworkMessageChunkedInput implements 
ChunkedInput<ByteBuf> {
+        /** Network message. */
+        private final NetworkMessage msg;
+
+        /** Message serializer. */
+        private final MessageSerializer<NetworkMessage> serializer;
+
+        /** Message writer. */
+        private final DirectMessageWriter writer = new 
DirectMessageWriter(ConnectionManager.DIRECT_PROTOCOL_VERSION);
+
+        /** Whether the message was fully written. */
+        boolean finished = false;
+
+        /**
+         * Constructor.
+         *
+         * @param msg Network message.
+         * @param serializer Serializer.
+         */
+        private NetworkMessageChunkedInput(NetworkMessage msg, 
MessageSerializer<NetworkMessage> serializer) {
+            this.msg = msg;
+            this.serializer = serializer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean isEndOfInput() throws Exception {
+            return finished;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+
+        }
+
+        /** {@inheritDoc} */
+        @Deprecated
+        @Override public ByteBuf readChunk(ChannelHandlerContext ctx) throws 
Exception {
+            return readChunk(ctx.alloc());
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuf readChunk(ByteBufAllocator allocator) throws 
Exception {
+            ByteBuf buffer = allocator.buffer(4096);

Review comment:
       This should be configurable

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.Attribute;
+import io.netty.util.AttributeKey;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.MessageReader;
+import org.apache.ignite.network.internal.direct.DirectMessageReader;
+import org.apache.ignite.network.message.MessageDeserializer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Decodes {@link ByteBuf}s into {@link NetworkMessage}s.
+ */
+public class InboundDecoder extends ByteToMessageDecoder {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(InboundDecoder.class);
+
+    /** Message reader channel attribute key. */
+    private static final AttributeKey<MessageReader> READER_KEY = 
AttributeKey.valueOf("READER");
+
+    /** Message deserializer channel attribute key. */
+    private static final AttributeKey<MessageDeserializer<NetworkMessage>> 
DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER");
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /**
+     * Constructor.
+     *
+     * @param serializationRegistry Serialization registry.
+     */
+    public InboundDecoder(MessageSerializationRegistry serializationRegistry) {
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, 
List<Object> out) throws Exception {
+        ByteBuffer buffer = in.nioBuffer();
+
+        Attribute<MessageReader> readerAttr = ctx.attr(READER_KEY);
+        MessageReader reader = readerAttr.get();
+
+        if (reader == null) {
+            reader = new DirectMessageReader(serializationRegistry, 
ConnectionManager.DIRECT_PROTOCOL_VERSION);
+            readerAttr.set(reader);
+        }
+
+        Attribute<MessageDeserializer<NetworkMessage>> messageAttr = 
ctx.attr(DESERIALIZER_KEY);
+
+        while (buffer.hasRemaining()) {
+            MessageDeserializer<NetworkMessage> msg = messageAttr.get();
+
+            try {
+                // Read message type.
+                if (msg == null && buffer.remaining() >= 
NetworkMessage.DIRECT_TYPE_SIZE) {
+                    byte b0 = buffer.get();
+                    byte b1 = buffer.get();
+
+                    msg = 
serializationRegistry.createDeserializer(makeMessageType(b0, b1));

Review comment:
       We will implement "module direct type" later, right?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty client channel wrapper.
+ */
+public class NettyClient {
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Destination host. */
+    private final String host;
+
+    /** Destination port. */
+    private final int port;
+
+    /** Future that resolves when client channel is opened. */
+    private final CompletableFuture<NettySender> clientFuture = new 
CompletableFuture<>();
+
+    /** Client socket channel. */
+    private Channel channel;
+
+    public NettyClient(
+        String host,
+        int port,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.host = host;
+        this.port = port;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Start client.
+     *
+     * @return Future that resolves when client channel is opened.
+     */
+    public CompletableFuture<NettySender> start(Bootstrap bootstrap) {
+        bootstrap.connect(host, port).addListener((ChannelFutureListener) 
connect -> {
+            this.channel = connect.channel();
+            if (connect.isSuccess())
+                clientFuture.complete(new NettySender(channel, 
serializationRegistry));
+            else
+                clientFuture.completeExceptionally(connect.cause());
+        });
+
+        return clientFuture;
+    }
+
+    /**
+     * @return Client start future.
+     */
+    public CompletableFuture<NettySender> sender() {
+        return clientFuture;
+    }
+
+    /**
+     * Stop client.
+     */
+    public void stop() {
+        this.channel.close().awaitUninterruptibly();
+    }
+
+    public static Bootstrap setupBootstrap(
+        EventLoopGroup eventLoopGroup,
+        MessageSerializationRegistry serializationRegistry,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener
+    ) {
+        Bootstrap clientBootstrap = new Bootstrap();
+
+        clientBootstrap.group(eventLoopGroup)
+            .channel(NioSocketChannel.class)
+            /** See {@link NettyServer#start} for netty configuration details. 
*/

Review comment:
       Wait a minute, what's the point of making /** */ instead of /* */? IDE 
highlighting for the link?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/message/ScaleCubeMessage.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube.message;
+
+import io.scalecube.cluster.transport.api.Message;
+import java.util.Map;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Wrapper for ScaleCube's {@link Message}.
+ * {@link Message#data} is stored in {@link #array} and {@link 
Message#headers} are stored in {@link #headers}.
+ */
+public class ScaleCubeMessage implements NetworkMessage {
+    /** Direct type. */
+    public static final short TYPE = 100;

Review comment:
       100?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOG.info("[{0}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOG.info("[{0}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return doStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        var addr = InetSocketAddress.createUnresolved(address.host(), 
address.port());
+        return Mono.defer(() -> 
Mono.fromFuture(connectionManager.channel(addr)))
+            .flatMap(client -> {
+                client.send(fromMessage(message));
+                return Mono.empty();
+            });
+    }
+
+    /**
+     * Handles new network messages from {@link #connectionManager}.
+     *
+     * @param source Message source.
+     * @param msg Network message.
+     */
+    private void onMessage(InetSocketAddress source, NetworkMessage msg) {
+        Message message = fromNetworkMessage(msg);
+
+        if (message != null)
+            sink.next(message);
+    }
+
+    /**
+     * Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
+     *
+     * @param message ScaleCube message.
+     * @return Netowork message that wraps ScaleCube message.
+     * @throws IgniteInternalException If failed to write message to 
ObjectOutputStream.
+     */
+    private NetworkMessage fromMessage(Message message) throws 
IgniteInternalException {
+        Object dataObj = message.data();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        try(ObjectOutputStream oos = new ObjectOutputStream(stream)) {
+            oos.writeObject(dataObj);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        return new ScaleCubeMessage(stream.toByteArray(), message.headers());
+    }
+
+    /**
+     * Unwrap ScaleCube {@link Message} from {@link NetworkMessage}.
+     *
+     * @param networkMessage Network message.
+     * @return ScaleCube message.
+     * @throws IgniteInternalException If failed to read ScaleCube message 
byte array.
+     */
+    private Message fromNetworkMessage(NetworkMessage networkMessage) throws 
IgniteInternalException {
+        if (networkMessage instanceof ScaleCubeMessage) {

Review comment:
       Should we assert this instead of checking?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.internal.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.message.NetworkMessage;
+
+/**
+ * Netty server channel wrapper.
+ */
+public class NettyServer {
+    /** {@link ServerSocketChannel} bootstrapper. */
+    private final ServerBootstrap bootstrap = new ServerBootstrap();
+
+    /** Socket accepter event loop group. */
+    private final NioEventLoopGroup bossGroup = new NioEventLoopGroup();
+
+    /** Socket handler event loop group. */
+    private final NioEventLoopGroup workerGroup = new NioEventLoopGroup();
+
+    /** Server port. */
+    private final int port;
+
+    /** Serialization registry. */
+    private final MessageSerializationRegistry serializationRegistry;
+
+    /** Incoming message listener. */
+    private final BiConsumer<InetSocketAddress, NetworkMessage> 
messageListener;
+
+    /** Server socket channel. */
+    private ServerSocketChannel channel;
+
+    /** New connections listener. */
+    private final Consumer<SocketChannel> newConnectionListener;
+
+    /**
+     * Constructor.
+     *
+     * @param port Server port.
+     * @param newConnectionListener New connections listener.
+     * @param messageListener Message listener.
+     * @param serializationRegistry Serialization registry.
+     */
+    public NettyServer(
+        int port,
+        Consumer<SocketChannel> newConnectionListener,
+        BiConsumer<InetSocketAddress, NetworkMessage> messageListener,
+        MessageSerializationRegistry serializationRegistry
+    ) {
+        this.port = port;
+        this.newConnectionListener = newConnectionListener;
+        this.messageListener = messageListener;
+        this.serializationRegistry = serializationRegistry;
+    }
+
+    /**
+     * Starts the server.
+     *
+     * @return Future that resolves when the server is successfully started.
+     */
+    public CompletableFuture<Void> start() {
+        bootstrap.group(bossGroup, workerGroup)
+            .channel(NioServerSocketChannel.class)
+            .childHandler(new ChannelInitializer<SocketChannel>() {
+                /** {@inheritDoc} */
+                @Override public void initChannel(SocketChannel ch) {
+                    ch.pipeline().addLast(
+                        /**

Review comment:
       Again, why not /* ?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOG.info("[{0}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOG.info("[{0}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return doStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        var addr = InetSocketAddress.createUnresolved(address.host(), 
address.port());
+        return Mono.defer(() -> 
Mono.fromFuture(connectionManager.channel(addr)))
+            .flatMap(client -> {
+                client.send(fromMessage(message));
+                return Mono.empty();
+            });
+    }
+
+    /**
+     * Handles new network messages from {@link #connectionManager}.
+     *
+     * @param source Message source.
+     * @param msg Network message.
+     */
+    private void onMessage(InetSocketAddress source, NetworkMessage msg) {
+        Message message = fromNetworkMessage(msg);
+
+        if (message != null)
+            sink.next(message);
+    }
+
+    /**
+     * Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
+     *
+     * @param message ScaleCube message.
+     * @return Netowork message that wraps ScaleCube message.
+     * @throws IgniteInternalException If failed to write message to 
ObjectOutputStream.
+     */
+    private NetworkMessage fromMessage(Message message) throws 
IgniteInternalException {
+        Object dataObj = message.data();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        try(ObjectOutputStream oos = new ObjectOutputStream(stream)) {
+            oos.writeObject(dataObj);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        return new ScaleCubeMessage(stream.toByteArray(), message.headers());
+    }
+
+    /**
+     * Unwrap ScaleCube {@link Message} from {@link NetworkMessage}.
+     *
+     * @param networkMessage Network message.
+     * @return ScaleCube message.
+     * @throws IgniteInternalException If failed to read ScaleCube message 
byte array.
+     */
+    private Message fromNetworkMessage(NetworkMessage networkMessage) throws 
IgniteInternalException {
+        if (networkMessage instanceof ScaleCubeMessage) {
+            ScaleCubeMessage msg = (ScaleCubeMessage) networkMessage;
+
+            Map<String, String> headers = msg.getHeaders();
+
+            Object obj;
+
+            try(ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(msg.getArray()))) {

Review comment:
       formatting

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOG.info("[{0}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOG.info("[{0}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return doStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        var addr = InetSocketAddress.createUnresolved(address.host(), 
address.port());
+        return Mono.defer(() -> 
Mono.fromFuture(connectionManager.channel(addr)))
+            .flatMap(client -> {
+                client.send(fromMessage(message));
+                return Mono.empty();
+            });
+    }
+
+    /**
+     * Handles new network messages from {@link #connectionManager}.
+     *
+     * @param source Message source.
+     * @param msg Network message.
+     */
+    private void onMessage(InetSocketAddress source, NetworkMessage msg) {
+        Message message = fromNetworkMessage(msg);
+
+        if (message != null)
+            sink.next(message);
+    }
+
+    /**
+     * Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
+     *
+     * @param message ScaleCube message.
+     * @return Netowork message that wraps ScaleCube message.
+     * @throws IgniteInternalException If failed to write message to 
ObjectOutputStream.
+     */
+    private NetworkMessage fromMessage(Message message) throws 
IgniteInternalException {
+        Object dataObj = message.data();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        try(ObjectOutputStream oos = new ObjectOutputStream(stream)) {

Review comment:
       formatting

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOG.info("[{0}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOG.info("[{0}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return doStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        var addr = InetSocketAddress.createUnresolved(address.host(), 
address.port());
+        return Mono.defer(() -> 
Mono.fromFuture(connectionManager.channel(addr)))
+            .flatMap(client -> {
+                client.send(fromMessage(message));
+                return Mono.empty();

Review comment:
       So we don't wait for acks here, but why?

##########
File path: 
modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network.scalecube;
+
+import io.scalecube.cluster.transport.api.Message;
+import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.net.Address;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.internal.netty.ConnectionManager;
+import org.apache.ignite.network.message.NetworkMessage;
+import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.DirectProcessor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
+
+/**
+ * ScaleCube transport over {@link ConnectionManager}.
+ */
+public class ScaleCubeDirectMarshallerTransport implements Transport {
+    /** Logger. */
+    private static final IgniteLogger LOG = 
IgniteLogger.forClass(Transport.class);
+
+    /** Message subject. */
+    private final DirectProcessor<Message> subject = DirectProcessor.create();
+
+    /** Message sink. */
+    private final FluxSink<Message> sink = subject.sink();
+
+    /** Close handler */
+    private final MonoProcessor<Void> stop = MonoProcessor.create();
+
+    /** On stop. */
+    private final MonoProcessor<Void> onStop = MonoProcessor.create();
+
+    /** Connection manager. */
+    private final ConnectionManager connectionManager;
+
+    /** Node address. */
+    private final Address address;
+
+    /**
+     * Constructor.
+     *
+     * @param connectionManager Connection manager.
+     */
+    public ScaleCubeDirectMarshallerTransport(ConnectionManager 
connectionManager) {
+        this.connectionManager = connectionManager;
+        this.connectionManager.addListener(this::onMessage);
+        this.address = prepareAddress(connectionManager.getLocalAddress());
+        // Setup cleanup
+        stop.then(doStop())
+            .doFinally(s -> onStop.onComplete())
+            .subscribe(
+                null,
+                ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", 
address, ex.toString())
+            );
+    }
+
+    /**
+     * Convert {@link InetSocketAddress} to {@link Address}.
+     *
+     * @param addr Address.
+     * @return ScaleCube address.
+     */
+    private static Address prepareAddress(InetSocketAddress addr) {
+        InetAddress address = addr.getAddress();
+
+        int port = addr.getPort();
+
+        if (address.isAnyLocalAddress())
+            return 
Address.create(Address.getLocalIpAddress().getHostAddress(), port);
+        else
+            return Address.create(address.getHostAddress(), port);
+    }
+
+    /**
+     * Cleanup resources on stop.
+     *
+     * @return A mono, that resolves when the stop operation is finished.
+     */
+    private Mono<Void> doStop() {
+        return Mono.defer(() -> {
+            LOG.info("[{0}][doStop] Stopping", address);
+
+            // Complete incoming messages observable
+            sink.complete();
+
+            LOG.info("[{0}][doStop] Stopped", address);
+            return Mono.empty();
+        });
+    }
+
+    /** {@inheritDoc} */
+    @Override public Address address() {
+        return address;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Transport> start() {
+        return Mono.just(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> stop() {
+        return doStop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isStopped() {
+        return onStop.isDisposed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Mono<Void> send(Address address, Message message) {
+        var addr = InetSocketAddress.createUnresolved(address.host(), 
address.port());
+        return Mono.defer(() -> 
Mono.fromFuture(connectionManager.channel(addr)))
+            .flatMap(client -> {
+                client.send(fromMessage(message));
+                return Mono.empty();
+            });
+    }
+
+    /**
+     * Handles new network messages from {@link #connectionManager}.
+     *
+     * @param source Message source.
+     * @param msg Network message.
+     */
+    private void onMessage(InetSocketAddress source, NetworkMessage msg) {
+        Message message = fromNetworkMessage(msg);
+
+        if (message != null)
+            sink.next(message);
+    }
+
+    /**
+     * Wrap ScaleCube {@link Message} with {@link NetworkMessage}.
+     *
+     * @param message ScaleCube message.
+     * @return Netowork message that wraps ScaleCube message.
+     * @throws IgniteInternalException If failed to write message to 
ObjectOutputStream.
+     */
+    private NetworkMessage fromMessage(Message message) throws 
IgniteInternalException {
+        Object dataObj = message.data();
+        ByteArrayOutputStream stream = new ByteArrayOutputStream();
+
+        try(ObjectOutputStream oos = new ObjectOutputStream(stream)) {
+            oos.writeObject(dataObj);
+        }
+        catch (IOException e) {
+            throw new IgniteInternalException(e);
+        }
+
+        return new ScaleCubeMessage(stream.toByteArray(), message.headers());
+    }
+
+    /**
+     * Unwrap ScaleCube {@link Message} from {@link NetworkMessage}.
+     *
+     * @param networkMessage Network message.
+     * @return ScaleCube message.
+     * @throws IgniteInternalException If failed to read ScaleCube message 
byte array.
+     */
+    private Message fromNetworkMessage(NetworkMessage networkMessage) throws 
IgniteInternalException {
+        if (networkMessage instanceof ScaleCubeMessage) {
+            ScaleCubeMessage msg = (ScaleCubeMessage) networkMessage;
+
+            Map<String, String> headers = msg.getHeaders();
+
+            Object obj;
+
+            try(ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(msg.getArray()))) {
+                obj = ois.readObject();
+            }
+            catch (Exception e) {
+                throw new IgniteInternalException(e);

Review comment:
       How will scalecube react to IgniteInternalException? I think we need 
something else.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to