sashapolo commented on a change in pull request #102: URL: https://github.com/apache/ignite-3/pull/102#discussion_r623745503
########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectUtils.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.direct; + +import java.nio.ByteBuffer; + +/** + * Direct marshalling utils. + */ +public class DirectUtils { Review comment: Should it be called `DirectMarshallingUtils`? ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/direct/DirectUtils.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.direct; + +import java.nio.ByteBuffer; + +/** + * Direct marshalling utils. + */ +public class DirectUtils { + /** + * Reads a direct message type from a byte buffer. + * + * @param buffer Byte buffer. + * @return Direct message type. + */ + public static short getMessageType(ByteBuffer buffer) { Review comment: This method is only used in one place, why do you need a whole class for it? ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); + + public ConnectionManager(int port, MessageSerializationRegistry provider) { Review comment: missing comment =) ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); + + public ConnectionManager(int port, MessageSerializationRegistry provider) { + this.serializationRegistry = provider; + this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry); + this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage); + } + + /** + * Start server. + * + * @throws IgniteInternalException If failed to start. + */ + public void start() throws IgniteInternalException { + try { + server.start().join(); + } + catch (CompletionException e) { + Throwable cause = e.getCause(); + throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause); + } + } + + /** + * @return Server local address. + */ + public InetSocketAddress getLocalAddress() { + return server.address(); + } + + /** + * Get a {@link NettySender}, that sends data from this node to another node with the specified address. + * @param address Another node's address. + * @return Sender. + */ + public CompletableFuture<NettySender> channel(InetSocketAddress address) { + NettySender channel = channels.compute(address, (addr, sender) -> { + if (sender == null || !sender.isOpen()) + return null; + + return sender; + }); + + if (channel == null) { Review comment: I would suggest writing: ``` if (channel != null) return CompletableFuture.completedFuture(channel); ``` I think it's more readable ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); + + public ConnectionManager(int port, MessageSerializationRegistry provider) { + this.serializationRegistry = provider; + this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry); + this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage); + } + + /** + * Start server. + * + * @throws IgniteInternalException If failed to start. + */ + public void start() throws IgniteInternalException { + try { + server.start().join(); + } + catch (CompletionException e) { + Throwable cause = e.getCause(); + throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause); + } + } + + /** + * @return Server local address. + */ + public InetSocketAddress getLocalAddress() { + return server.address(); + } + + /** + * Get a {@link NettySender}, that sends data from this node to another node with the specified address. + * @param address Another node's address. + * @return Sender. + */ + public CompletableFuture<NettySender> channel(InetSocketAddress address) { + NettySender channel = channels.compute(address, (addr, sender) -> { + if (sender == null || !sender.isOpen()) + return null; + + return sender; + }); + + if (channel == null) { + return clients.compute(address, (addr, client) -> { Review comment: can you extract the `compute` result into a variable? I think it would be more readable ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Netty client channel wrapper. + */ +public class NettyClient { + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Destination host. */ + private final String host; + + /** Destination port. */ + private final int port; + + /** Future that resolves when client channel is opened. */ + private final CompletableFuture<NettySender> clientFuture = new CompletableFuture<>(); + + /** Client channel. */ + private SocketChannel channel; + + public NettyClient( Review comment: missing comment =) ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettySender.java ########## @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.stream.ChunkedInput; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.network.internal.direct.DirectMessageWriter; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.MessageSerializer; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Wrapper for a Netty {@link Channel}, that uses {@link ChunkedInput} and {@link DirectMessageWriter} to send data. + */ +public class NettySender { + /** Netty channel. */ + private final SocketChannel channel; + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** + * Constructor. + * + * @param channel Netty channel. + * @param registry Serialization registry. + */ + public NettySender(SocketChannel channel, MessageSerializationRegistry registry) { + this.channel = channel; + serializationRegistry = registry; + } + + /** + * Send message. + * + * @param msg Network message. + */ + public CompletableFuture<Void> send(NetworkMessage msg) { + MessageSerializer<NetworkMessage> serializer = serializationRegistry.createSerializer(msg.directType()); + + ChannelFuture future = channel.writeAndFlush(new NetworkMessageChunkedInput(msg, serializer)); + + CompletableFuture<Void> fut = new CompletableFuture<>(); + + future.addListener(sent -> { + if (sent.isSuccess()) + fut.complete(null); + else + fut.completeExceptionally(sent.cause()); + }); + + return fut; + } + + /** + * Close channel. + */ + public void close() { + this.channel.close().awaitUninterruptibly(); + } + + /** + * @return Gets the remote address of the channel. + */ + public InetSocketAddress remoteAddress() { + return this.channel.remoteAddress(); + } + + /** + * @return {@code true} if channel is open, {@code false} otherwise. + */ + public boolean isOpen() { + return this.channel.isOpen(); + } + + /** + * Chunked input for network message. + */ + private static class NetworkMessageChunkedInput implements ChunkedInput<ByteBuf> { + /** Network message. */ + private final NetworkMessage msg; + + /** Message serializer. */ + private final MessageSerializer<NetworkMessage> serializer; + + /** Message writer. */ + private final DirectMessageWriter writer = new DirectMessageWriter(ConnectionManager.DIRECT_PROTOCOL_VERSION); + + /** Whether the message was fully written. */ + boolean finished = false; Review comment: ```suggestion private boolean finished = false; ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.lang.IgniteLogger; +import org.apache.ignite.network.internal.MessageReader; +import org.apache.ignite.network.internal.direct.DirectMessageReader; +import org.apache.ignite.network.internal.direct.DirectUtils; +import org.apache.ignite.network.message.MessageDeserializer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Decodes {@link ByteBuf}s into {@link NetworkMessage}s. + */ +public class InboundDecoder extends ByteToMessageDecoder { + /** Logger. */ + private static final IgniteLogger LOG = IgniteLogger.forClass(InboundDecoder.class); + + /** Message reader channel attribute key. */ + private static final AttributeKey<MessageReader> READER_KEY = AttributeKey.valueOf("READER"); + + /** Message deserializer channel attribute key. */ + private static final AttributeKey<MessageDeserializer<NetworkMessage>> DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER"); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** + * Constructor. + * + * @param serializationRegistry Serialization registry. + */ + public InboundDecoder(MessageSerializationRegistry serializationRegistry) { + this.serializationRegistry = serializationRegistry; + } + + /** {@inheritDoc} */ + @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + ByteBuffer buffer = in.nioBuffer(); + + Attribute<MessageReader> readerAttr = ctx.attr(READER_KEY); Review comment: `attr` is deprecated =( Looks like you should use the channel attributes as you originally did, sorry... ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.net.Address; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Objects; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.lang.IgniteLogger; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.message.NetworkMessage; +import org.apache.ignite.network.scalecube.message.ScaleCubeMessage; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +/** + * ScaleCube transport over {@link ConnectionManager}. + */ +public class ScaleCubeDirectMarshallerTransport implements Transport { + /** Logger. */ + private static final IgniteLogger LOG = IgniteLogger.forClass(Transport.class); + + /** Message subject. */ + private final DirectProcessor<Message> subject = DirectProcessor.create(); + + /** Message sink. */ + private final FluxSink<Message> sink = subject.sink(); + + /** Close handler */ + private final MonoProcessor<Void> stop = MonoProcessor.create(); + + /** On stop. */ + private final MonoProcessor<Void> onStop = MonoProcessor.create(); + + /** Connection manager. */ + private final ConnectionManager connectionManager; + + /** Node address. */ + private final Address address; + + /** + * Constructor. + * + * @param connectionManager Connection manager. + */ + public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + this.connectionManager.addListener(this::onMessage); + this.address = prepareAddress(connectionManager.getLocalAddress()); + // Setup cleanup + stop.then(doStop()) + .doFinally(s -> onStop.onComplete()) + .subscribe( + null, + ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", address, ex.toString()) + ); + } + + /** + * Convert {@link InetSocketAddress} to {@link Address}. + * + * @param addr Address. + * @return ScaleCube address. + */ + private static Address prepareAddress(InetSocketAddress addr) { + InetAddress address = addr.getAddress(); + + int port = addr.getPort(); + + if (address.isAnyLocalAddress()) + return Address.create(Address.getLocalIpAddress().getHostAddress(), port); + else + return Address.create(address.getHostAddress(), port); + } + + /** + * Cleanup resources on stop. + * + * @return A mono, that resolves when the stop operation is finished. + */ + private Mono<Void> doStop() { + return Mono.defer(() -> { + LOG.info("[{0}][doStop] Stopping", address); + + // Complete incoming messages observable + sink.complete(); + + LOG.info("[{0}][doStop] Stopped", address); + return Mono.empty(); + }); + } + + /** {@inheritDoc} */ + @Override public Address address() { + return address; + } + + /** {@inheritDoc} */ + @Override public Mono<Transport> start() { + return Mono.just(this); + } + + /** {@inheritDoc} */ + @Override public Mono<Void> stop() { + return doStop(); + } + + /** {@inheritDoc} */ + @Override public boolean isStopped() { + return onStop.isDisposed(); + } + + /** {@inheritDoc} */ + @Override public Mono<Void> send(Address address, Message message) { + var addr = InetSocketAddress.createUnresolved(address.host(), address.port()); + return Mono.defer(() -> Mono.fromFuture(connectionManager.channel(addr))) + .flatMap(client -> { + return Mono.fromFuture(client.send(fromMessage(message))); + }); + } + + /** + * Handles new network messages from {@link #connectionManager}. + * + * @param source Message source. + * @param msg Network message. + */ + private void onMessage(InetSocketAddress source, NetworkMessage msg) { + Message message = fromNetworkMessage(msg); + + if (message != null) + sink.next(message); + } + + /** + * Wrap ScaleCube {@link Message} with {@link NetworkMessage}. + * + * @param message ScaleCube message. + * @return Netowork message that wraps ScaleCube message. + * @throws IgniteInternalException If failed to write message to ObjectOutputStream. + */ + private NetworkMessage fromMessage(Message message) throws IgniteInternalException { + Object dataObj = message.data(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { + oos.writeObject(dataObj); + } + catch (IOException e) { + throw new IgniteInternalException(e); + } + + return new ScaleCubeMessage(stream.toByteArray(), message.headers()); + } + + /** + * Unwrap ScaleCube {@link Message} from {@link NetworkMessage}. + * + * @param networkMessage Network message. + * @return ScaleCube message. + * @throws IgniteInternalException If failed to read ScaleCube message byte array. + */ + private Message fromNetworkMessage(NetworkMessage networkMessage) throws IgniteInternalException { Review comment: ```suggestion @Nullable private Message fromNetworkMessage(NetworkMessage networkMessage) throws IgniteInternalException { ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Netty server channel wrapper. + */ +public class NettyServer { + /** {@link ServerSocketChannel} bootstrapper. */ + private final ServerBootstrap bootstrap = new ServerBootstrap(); + + /** Socket accepter event loop group. */ + private final NioEventLoopGroup bossGroup = new NioEventLoopGroup(); + + /** Socket handler event loop group. */ + private final NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + + /** Server port. */ + private final int port; + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Incoming message listener. */ + private final BiConsumer<InetSocketAddress, NetworkMessage> messageListener; + + /** Server socket channel. */ + private ServerSocketChannel channel; + + /** New connections listener. */ + private final Consumer<SocketChannel> newConnectionListener; + + /** + * Constructor. + * + * @param port Server port. + * @param newConnectionListener New connections listener. + * @param messageListener Message listener. + * @param serializationRegistry Serialization registry. + */ + public NettyServer( + int port, + Consumer<SocketChannel> newConnectionListener, + BiConsumer<InetSocketAddress, NetworkMessage> messageListener, + MessageSerializationRegistry serializationRegistry + ) { + this.port = port; + this.newConnectionListener = newConnectionListener; + this.messageListener = messageListener; + this.serializationRegistry = serializationRegistry; + } + + /** + * Start server. + * + * @return Future that resolves when server is successfuly started. + */ + public CompletableFuture<Void> start() { + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + /** {@inheritDoc} */ + @Override public void initChannel(SocketChannel ch) + throws Exception { + ch.pipeline().addLast( + /** + * Decoder that uses {@link org.apache.ignite.network.internal.MessageReader} + * to read chunked data. + */ + new InboundDecoder(serializationRegistry), + /** Handles decoded {@link NetworkMessage}s. */ + new MessageHandler(messageListener), + /** + * Encoder that uses {@link org.apache.ignite.network.internal.MessageWriter} + * to write chunked data. + */ + new ChunkedWriteHandler() + ); + + newConnectionListener.accept(ch); + } + }) + /** + * The maximum queue length for incoming connection indications (a request to connect) is set + * to the backlog parameter. If a connection indication arrives when the queue is full, + * the connection is refused. + */ + .option(ChannelOption.SO_BACKLOG, 128) + /** + * When the keepalive option is set for a TCP socket and no data has been exchanged across the socket + * in either direction for 2 hours (NOTE: the actual value is implementation dependent), + * TCP automatically sends a keepalive probe to the peer. + */ + .childOption(ChannelOption.SO_KEEPALIVE, true); + + CompletableFuture<Void> serverStartFuture = new CompletableFuture<>(); + + ChannelFuture bindFuture = bootstrap.bind(port); + + bindFuture.addListener(bind -> { + this.channel = (ServerSocketChannel) bindFuture.channel(); + + if (bind.isSuccess()) + serverStartFuture.complete(null); + else { + Throwable cause = bind.cause(); + serverStartFuture.completeExceptionally(cause); + } + + // Shutdown event loops on server stop. + channel.closeFuture().addListener(close -> { + workerGroup.shutdownGracefully(); Review comment: > (and need) I think there might be some need for that, since there can still be some event processing happening... ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); + + public ConnectionManager(int port, MessageSerializationRegistry provider) { + this.serializationRegistry = provider; + this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry); + this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage); + } + + /** + * Start server. + * + * @throws IgniteInternalException If failed to start. + */ + public void start() throws IgniteInternalException { + try { + server.start().join(); + } + catch (CompletionException e) { + Throwable cause = e.getCause(); + throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause); + } + } + + /** + * @return Server local address. + */ + public InetSocketAddress getLocalAddress() { + return server.address(); + } + + /** + * Get a {@link NettySender}, that sends data from this node to another node with the specified address. + * @param address Another node's address. + * @return Sender. + */ + public CompletableFuture<NettySender> channel(InetSocketAddress address) { + NettySender channel = channels.compute(address, (addr, sender) -> { + if (sender == null || !sender.isOpen()) + return null; + + return sender; + }); + + if (channel == null) { + return clients.compute(address, (addr, client) -> { + if (client != null && !client.failedToConnect() && !client.isDisconnected()) + return client; + + return this.connect(addr); Review comment: ```suggestion return connect(addr); ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); Review comment: You can write `new CopyOnWriteArrayList<>()`, passing an empty list is redundant ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); + + public ConnectionManager(int port, MessageSerializationRegistry provider) { + this.serializationRegistry = provider; + this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry); + this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage); + } + + /** + * Start server. + * + * @throws IgniteInternalException If failed to start. + */ + public void start() throws IgniteInternalException { + try { + server.start().join(); + } + catch (CompletionException e) { + Throwable cause = e.getCause(); + throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause); + } + } + + /** + * @return Server local address. + */ + public InetSocketAddress getLocalAddress() { + return server.address(); + } + + /** + * Get a {@link NettySender}, that sends data from this node to another node with the specified address. + * @param address Another node's address. + * @return Sender. + */ + public CompletableFuture<NettySender> channel(InetSocketAddress address) { + NettySender channel = channels.compute(address, (addr, sender) -> { Review comment: Can we use a ternary operator? Or is it prohibited by the code style? For example: ``` NettySender channel = channels.compute( address, (addr, sender) -> sender == null || !sender.isOpen() ? null : sender ); ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Netty client channel wrapper. + */ +public class NettyClient { + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Destination host. */ + private final String host; + + /** Destination port. */ + private final int port; + + /** Future that resolves when client channel is opened. */ + private final CompletableFuture<NettySender> clientFuture = new CompletableFuture<>(); + + /** Client channel. */ + private SocketChannel channel; + + public NettyClient( + String host, + int port, + MessageSerializationRegistry serializationRegistry + ) { + this.host = host; + this.port = port; + this.serializationRegistry = serializationRegistry; + } + + /** + * Start client. + * + * @return Future that resolves when client channel is opened. + */ + public CompletableFuture<NettySender> start(Bootstrap bootstrap) { + bootstrap.connect(host, port).addListener((ChannelFutureListener) connect -> { + this.channel = (SocketChannel) connect.channel(); + + if (connect.isSuccess()) + clientFuture.complete(new NettySender(channel, serializationRegistry)); + else + clientFuture.completeExceptionally(connect.cause()); + }); + + return clientFuture; + } + + /** + * @return Client start future. + */ + public CompletableFuture<NettySender> sender() { + return clientFuture; + } + + /** + * Stop client. + */ + public void stop() { + this.channel.close().awaitUninterruptibly(); + } + + /** + * @return {@code true} if client failed to connect to remote server, {@code false} otherwise. Review comment: ```suggestion * @return {@code true} if the client has failed to connect to the remote server, {@code false} otherwise. ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Netty client channel wrapper. + */ +public class NettyClient { + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Destination host. */ + private final String host; + + /** Destination port. */ + private final int port; + + /** Future that resolves when client channel is opened. */ + private final CompletableFuture<NettySender> clientFuture = new CompletableFuture<>(); + + /** Client channel. */ + private SocketChannel channel; + + public NettyClient( + String host, + int port, + MessageSerializationRegistry serializationRegistry + ) { + this.host = host; + this.port = port; + this.serializationRegistry = serializationRegistry; + } + + /** + * Start client. + * + * @return Future that resolves when client channel is opened. + */ + public CompletableFuture<NettySender> start(Bootstrap bootstrap) { + bootstrap.connect(host, port).addListener((ChannelFutureListener) connect -> { + this.channel = (SocketChannel) connect.channel(); + + if (connect.isSuccess()) + clientFuture.complete(new NettySender(channel, serializationRegistry)); + else + clientFuture.completeExceptionally(connect.cause()); + }); + + return clientFuture; + } + + /** + * @return Client start future. + */ + public CompletableFuture<NettySender> sender() { + return clientFuture; + } + + /** + * Stop client. + */ + public void stop() { + this.channel.close().awaitUninterruptibly(); + } + + /** + * @return {@code true} if client failed to connect to remote server, {@code false} otherwise. + */ + public boolean failedToConnect() { + return clientFuture.isCompletedExceptionally(); + } + + /** + * @return {@code true} if client lost connection, {@code false} otherwise. Review comment: ```suggestion * @return {@code true} if the client has lost the connection, {@code false} otherwise. ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); + + public ConnectionManager(int port, MessageSerializationRegistry provider) { + this.serializationRegistry = provider; + this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry); + this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage); + } + + /** + * Start server. + * + * @throws IgniteInternalException If failed to start. + */ + public void start() throws IgniteInternalException { + try { + server.start().join(); + } + catch (CompletionException e) { + Throwable cause = e.getCause(); + throw new IgniteInternalException("Failed to start server: " + cause.getMessage(), cause); + } + } + + /** + * @return Server local address. + */ + public InetSocketAddress getLocalAddress() { + return server.address(); + } + + /** + * Get a {@link NettySender}, that sends data from this node to another node with the specified address. Review comment: ```suggestion * Gets a {@link NettySender}, that sends data from this node to another node with the specified address. ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Netty client channel wrapper. + */ +public class NettyClient { + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Destination host. */ + private final String host; + + /** Destination port. */ + private final int port; + + /** Future that resolves when client channel is opened. */ + private final CompletableFuture<NettySender> clientFuture = new CompletableFuture<>(); + + /** Client channel. */ + private SocketChannel channel; + + public NettyClient( + String host, + int port, + MessageSerializationRegistry serializationRegistry + ) { + this.host = host; + this.port = port; + this.serializationRegistry = serializationRegistry; + } + + /** + * Start client. + * + * @return Future that resolves when client channel is opened. + */ + public CompletableFuture<NettySender> start(Bootstrap bootstrap) { + bootstrap.connect(host, port).addListener((ChannelFutureListener) connect -> { + this.channel = (SocketChannel) connect.channel(); + + if (connect.isSuccess()) + clientFuture.complete(new NettySender(channel, serializationRegistry)); + else + clientFuture.completeExceptionally(connect.cause()); + }); + + return clientFuture; + } + + /** + * @return Client start future. + */ + public CompletableFuture<NettySender> sender() { + return clientFuture; + } + + /** + * Stop client. + */ + public void stop() { + this.channel.close().awaitUninterruptibly(); + } + + /** + * @return {@code true} if client failed to connect to remote server, {@code false} otherwise. + */ + public boolean failedToConnect() { + return clientFuture.isCompletedExceptionally(); + } + + /** + * @return {@code true} if client lost connection, {@code false} otherwise. + */ + public boolean isDisconnected() { + return channel != null && !channel.isOpen(); + } + + /** + * Creates {@link Bootstrap} for clients, providing channel handlers and options. Review comment: ```suggestion * Creates a {@link Bootstrap} for clients, providing channel handlers and options. ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.net.Address; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Objects; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.lang.IgniteLogger; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.message.NetworkMessage; +import org.apache.ignite.network.scalecube.message.ScaleCubeMessage; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +/** + * ScaleCube transport over {@link ConnectionManager}. + */ +public class ScaleCubeDirectMarshallerTransport implements Transport { + /** Logger. */ + private static final IgniteLogger LOG = IgniteLogger.forClass(Transport.class); + + /** Message subject. */ + private final DirectProcessor<Message> subject = DirectProcessor.create(); + + /** Message sink. */ + private final FluxSink<Message> sink = subject.sink(); + + /** Close handler */ + private final MonoProcessor<Void> stop = MonoProcessor.create(); + + /** On stop. */ + private final MonoProcessor<Void> onStop = MonoProcessor.create(); + + /** Connection manager. */ + private final ConnectionManager connectionManager; + + /** Node address. */ + private final Address address; + + /** + * Constructor. + * + * @param connectionManager Connection manager. + */ + public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + this.connectionManager.addListener(this::onMessage); + this.address = prepareAddress(connectionManager.getLocalAddress()); + // Setup cleanup + stop.then(doStop()) + .doFinally(s -> onStop.onComplete()) + .subscribe( + null, + ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", address, ex.toString()) + ); + } + + /** + * Convert {@link InetSocketAddress} to {@link Address}. + * + * @param addr Address. + * @return ScaleCube address. + */ + private static Address prepareAddress(InetSocketAddress addr) { + InetAddress address = addr.getAddress(); + + int port = addr.getPort(); + + if (address.isAnyLocalAddress()) + return Address.create(Address.getLocalIpAddress().getHostAddress(), port); + else + return Address.create(address.getHostAddress(), port); + } + + /** + * Cleanup resources on stop. + * + * @return A mono, that resolves when the stop operation is finished. + */ + private Mono<Void> doStop() { + return Mono.defer(() -> { + LOG.info("[{0}][doStop] Stopping", address); + + // Complete incoming messages observable + sink.complete(); + + LOG.info("[{0}][doStop] Stopped", address); + return Mono.empty(); + }); + } + + /** {@inheritDoc} */ + @Override public Address address() { + return address; + } + + /** {@inheritDoc} */ + @Override public Mono<Transport> start() { + return Mono.just(this); + } + + /** {@inheritDoc} */ + @Override public Mono<Void> stop() { + return doStop(); + } + + /** {@inheritDoc} */ + @Override public boolean isStopped() { + return onStop.isDisposed(); + } + + /** {@inheritDoc} */ + @Override public Mono<Void> send(Address address, Message message) { + var addr = InetSocketAddress.createUnresolved(address.host(), address.port()); + return Mono.defer(() -> Mono.fromFuture(connectionManager.channel(addr))) + .flatMap(client -> { + return Mono.fromFuture(client.send(fromMessage(message))); + }); + } + + /** + * Handles new network messages from {@link #connectionManager}. + * + * @param source Message source. + * @param msg Network message. + */ + private void onMessage(InetSocketAddress source, NetworkMessage msg) { + Message message = fromNetworkMessage(msg); + + if (message != null) + sink.next(message); + } + + /** + * Wrap ScaleCube {@link Message} with {@link NetworkMessage}. + * + * @param message ScaleCube message. + * @return Netowork message that wraps ScaleCube message. + * @throws IgniteInternalException If failed to write message to ObjectOutputStream. + */ + private NetworkMessage fromMessage(Message message) throws IgniteInternalException { + Object dataObj = message.data(); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + + try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { + oos.writeObject(dataObj); + } + catch (IOException e) { + throw new IgniteInternalException(e); + } + + return new ScaleCubeMessage(stream.toByteArray(), message.headers()); + } + + /** + * Unwrap ScaleCube {@link Message} from {@link NetworkMessage}. + * + * @param networkMessage Network message. + * @return ScaleCube message. + * @throws IgniteInternalException If failed to read ScaleCube message byte array. + */ + private Message fromNetworkMessage(NetworkMessage networkMessage) throws IgniteInternalException { + if (networkMessage instanceof ScaleCubeMessage) { + ScaleCubeMessage msg = (ScaleCubeMessage) networkMessage; + + Map<String, String> headers = msg.getHeaders(); + + Object obj; + + try (ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(msg.getArray()))) { + obj = ois.readObject(); + } + catch (Exception e) { + throw new IgniteInternalException(e); + } + + return Message.withHeaders(headers).data(obj).build(); + } + return null; + } + + /** {@inheritDoc} */ + @Override public Mono<Message> requestResponse(Address address, final Message request) { Review comment: ```suggestion @Override public Mono<Message> requestResponse(Address address, Message request) { ``` ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/ConnectionManager.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.BiConsumer; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Class that manages connections both incoming and outgoing. + */ +public class ConnectionManager { + /** Latest version of the direct marshalling protocol. */ + public static final byte DIRECT_PROTOCOL_VERSION = 1; + + /** Client bootstrap. */ + private final Bootstrap clientBootstrap; + + /** Client socket channel handler event loop group. */ + private final EventLoopGroup clientWorkerGroup = new NioEventLoopGroup(); + + /** Server. */ + private final NettyServer server; + + /** Channels. */ + private final Map<InetSocketAddress, NettySender> channels = new ConcurrentHashMap<>(); + + /** Clients. */ + private final Map<InetSocketAddress, NettyClient> clients = new ConcurrentHashMap<>(); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Message listeners. */ + private final List<BiConsumer<InetSocketAddress, NetworkMessage>> listeners = new CopyOnWriteArrayList<>(new ArrayList<>()); + + public ConnectionManager(int port, MessageSerializationRegistry provider) { + this.serializationRegistry = provider; + this.server = new NettyServer(port, this::onNewIncomingChannel, this::onMessage, serializationRegistry); + this.clientBootstrap = NettyClient.createBootstrap(clientWorkerGroup, serializationRegistry, this::onMessage); + } + + /** + * Start server. Review comment: ```suggestion * Starts the server. ``` ########## File path: modules/network/src/integrationTest/java/org/apache/ignite/network/scalecube/NettyTestRunner.java ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.nio.NioEventLoopGroup; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import org.apache.ignite.network.TestMessage; +import org.apache.ignite.network.TestMessageSerializationFactory; +import org.apache.ignite.network.internal.netty.NettyClient; +import org.apache.ignite.network.internal.netty.NettySender; +import org.apache.ignite.network.internal.netty.NettyServer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@Deprecated +// Only for WIP purposes Review comment: let's remove this class ########## File path: modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.internal.netty.NettySender; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link ConnectionManager}. + */ +public class ConnectionManagerTest { + /** Started connection managers. */ + private final List<ConnectionManager> startedManagers = new ArrayList<>(); + + /** */ + @AfterEach + void tearDown() { + startedManagers.forEach(ConnectionManager::stop); + + startedManagers.clear(); + } + + /** + * Tests that a message is sent successfuly using ConnectionManager. + * + * @throws Exception If failed. + */ + @Test + public void testSentSuccessfully() throws Exception { + String msgText = "test"; + + var latch = new CountDownLatch(1); + + int port1 = 4000; + int port2 = 4001; + + var manager1 = startManager(port1); + var manager2 = startManager(port2); + + manager2.addListener((address, message) -> { + if (message instanceof TestMessage && msgText.equals(((TestMessage) message).msg())) + latch.countDown(); + }); + + NettySender sender = manager1.channel(address(port2)).get(); + + TestMessage testMessage = new TestMessage(msgText, new HashMap<>()); + + sender.send(testMessage).join(); + + latch.await(3, TimeUnit.SECONDS); + } + + /** + * Tests that after a channel was closed, a new channel is opened upon a request. + * + * @throws Exception If failed. + */ + @Test + public void testCanReconnectAfterFail() throws Exception { + String msgText = "test"; + + var latch = new CountDownLatch(1); + + int port1 = 4000; + int port2 = 4001; + + var manager1 = startManager(port1); + var manager2 = startManager(port2); + + NettySender sender = manager1.channel(address(port2)).get(); + + TestMessage testMessage = new TestMessage(msgText, new HashMap<>()); + + manager2.stop(); + + final var finalSender = sender; + + assertThrows(ClosedChannelException.class, () -> { + try { + finalSender.send(testMessage).join(); + } + catch (CompletionException e) { + throw e.getCause(); + } + }); + + manager2 = startManager(port2); + + manager2.addListener((address, message) -> { + if (message instanceof TestMessage && msgText.equals(((TestMessage) message).msg())) + latch.countDown(); + }); + + sender = manager1.channel(address(port2)).get(); + + sender.send(testMessage).join(); + } + + /** + * Create an unresolved {@link InetSocketAddress} with "localhost" as a host. + * + * @param port Port. + * @return Address. + */ + private InetSocketAddress address(int port) { + return InetSocketAddress.createUnresolved("localhost", port); Review comment: can you simply use `new InetSocketAddress(port)` instead? ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyClient.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Netty client channel wrapper. + */ +public class NettyClient { + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Destination host. */ + private final String host; + + /** Destination port. */ + private final int port; + + /** Future that resolves when client channel is opened. */ + private final CompletableFuture<NettySender> clientFuture = new CompletableFuture<>(); + + /** Client channel. */ + private SocketChannel channel; + + public NettyClient( + String host, + int port, + MessageSerializationRegistry serializationRegistry + ) { + this.host = host; + this.port = port; + this.serializationRegistry = serializationRegistry; + } + + /** + * Start client. + * + * @return Future that resolves when client channel is opened. + */ + public CompletableFuture<NettySender> start(Bootstrap bootstrap) { + bootstrap.connect(host, port).addListener((ChannelFutureListener) connect -> { + this.channel = (SocketChannel) connect.channel(); + + if (connect.isSuccess()) + clientFuture.complete(new NettySender(channel, serializationRegistry)); + else + clientFuture.completeExceptionally(connect.cause()); + }); + + return clientFuture; + } + + /** + * @return Client start future. + */ + public CompletableFuture<NettySender> sender() { + return clientFuture; + } + + /** + * Stop client. + */ + public void stop() { + this.channel.close().awaitUninterruptibly(); + } + + /** + * @return {@code true} if client failed to connect to remote server, {@code false} otherwise. + */ + public boolean failedToConnect() { + return clientFuture.isCompletedExceptionally(); + } + + /** + * @return {@code true} if client lost connection, {@code false} otherwise. + */ + public boolean isDisconnected() { + return channel != null && !channel.isOpen(); + } + + /** + * Creates {@link Bootstrap} for clients, providing channel handlers and options. + * + * @param eventLoopGroup Event loop group for channel handling. + * @param serializationRegistry Serialization registry. + * @param messageListener Message listener. + * @return Bootstrap for clients. + */ + public static Bootstrap createBootstrap( Review comment: I think this method should be moved to the `ConnectionManager` ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java ########## @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.net.Address; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Objects; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.lang.IgniteLogger; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.message.NetworkMessage; +import org.apache.ignite.network.scalecube.message.ScaleCubeMessage; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +/** + * ScaleCube transport over {@link ConnectionManager}. + */ +public class ScaleCubeDirectMarshallerTransport implements Transport { + /** Logger. */ + private static final IgniteLogger LOG = IgniteLogger.forClass(Transport.class); + + /** Message subject. */ + private final DirectProcessor<Message> subject = DirectProcessor.create(); + + /** Message sink. */ + private final FluxSink<Message> sink = subject.sink(); + + /** Close handler */ + private final MonoProcessor<Void> stop = MonoProcessor.create(); + + /** On stop. */ + private final MonoProcessor<Void> onStop = MonoProcessor.create(); + + /** Connection manager. */ + private final ConnectionManager connectionManager; + + /** Node address. */ + private final Address address; + + /** + * Constructor. + * + * @param connectionManager Connection manager. + */ + public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + this.connectionManager.addListener(this::onMessage); + this.address = prepareAddress(connectionManager.getLocalAddress()); + // Setup cleanup + stop.then(doStop()) + .doFinally(s -> onStop.onComplete()) + .subscribe( + null, + ex -> LOG.warn("[{0}][doStop] Exception occurred: {1}", address, ex.toString()) + ); + } + + /** + * Convert {@link InetSocketAddress} to {@link Address}. + * + * @param addr Address. + * @return ScaleCube address. + */ + private static Address prepareAddress(InetSocketAddress addr) { + InetAddress address = addr.getAddress(); + + int port = addr.getPort(); + + if (address.isAnyLocalAddress()) + return Address.create(Address.getLocalIpAddress().getHostAddress(), port); + else + return Address.create(address.getHostAddress(), port); + } + + /** + * Cleanup resources on stop. + * + * @return A mono, that resolves when the stop operation is finished. + */ + private Mono<Void> doStop() { + return Mono.defer(() -> { + LOG.info("[{0}][doStop] Stopping", address); + + // Complete incoming messages observable + sink.complete(); + + LOG.info("[{0}][doStop] Stopped", address); + return Mono.empty(); + }); + } + + /** {@inheritDoc} */ + @Override public Address address() { + return address; + } + + /** {@inheritDoc} */ + @Override public Mono<Transport> start() { + return Mono.just(this); + } + + /** {@inheritDoc} */ + @Override public Mono<Void> stop() { + return doStop(); + } + + /** {@inheritDoc} */ + @Override public boolean isStopped() { + return onStop.isDisposed(); + } + + /** {@inheritDoc} */ + @Override public Mono<Void> send(Address address, Message message) { + var addr = InetSocketAddress.createUnresolved(address.host(), address.port()); + return Mono.defer(() -> Mono.fromFuture(connectionManager.channel(addr))) + .flatMap(client -> { Review comment: I would suggest handling the futures first and the wrap them in `Mono`, it looks quite strange otherwise. For example: ``` return Mono.fromFuture(() -> connectionManager.channel(addr) .thenCompose(client -> client.send(fromMessage(message))) ); ``` ########## File path: modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.internal.netty.NettySender; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link ConnectionManager}. + */ +public class ConnectionManagerTest { + /** Started connection managers. */ + private final List<ConnectionManager> startedManagers = new ArrayList<>(); + + /** */ + @AfterEach + void tearDown() { + startedManagers.forEach(ConnectionManager::stop); + + startedManagers.clear(); + } + + /** + * Tests that a message is sent successfuly using ConnectionManager. Review comment: ```suggestion * Tests that a message is sent successfully using the ConnectionManager. ``` ########## File path: modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.internal.netty.NettySender; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link ConnectionManager}. + */ +public class ConnectionManagerTest { + /** Started connection managers. */ + private final List<ConnectionManager> startedManagers = new ArrayList<>(); + + /** */ + @AfterEach + void tearDown() { + startedManagers.forEach(ConnectionManager::stop); + + startedManagers.clear(); Review comment: No need to call `clear` JUnit classes get re-instantiated on every test ########## File path: modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.internal.netty.NettySender; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link ConnectionManager}. + */ +public class ConnectionManagerTest { + /** Started connection managers. */ + private final List<ConnectionManager> startedManagers = new ArrayList<>(); + + /** */ + @AfterEach + void tearDown() { + startedManagers.forEach(ConnectionManager::stop); + + startedManagers.clear(); + } + + /** + * Tests that a message is sent successfuly using ConnectionManager. + * + * @throws Exception If failed. + */ + @Test + public void testSentSuccessfully() throws Exception { + String msgText = "test"; + + var latch = new CountDownLatch(1); + + int port1 = 4000; + int port2 = 4001; + + var manager1 = startManager(port1); + var manager2 = startManager(port2); + + manager2.addListener((address, message) -> { + if (message instanceof TestMessage && msgText.equals(((TestMessage) message).msg())) + latch.countDown(); + }); + + NettySender sender = manager1.channel(address(port2)).get(); + + TestMessage testMessage = new TestMessage(msgText, new HashMap<>()); + + sender.send(testMessage).join(); + + latch.await(3, TimeUnit.SECONDS); + } + + /** + * Tests that after a channel was closed, a new channel is opened upon a request. + * + * @throws Exception If failed. + */ + @Test + public void testCanReconnectAfterFail() throws Exception { + String msgText = "test"; + + var latch = new CountDownLatch(1); + + int port1 = 4000; + int port2 = 4001; + + var manager1 = startManager(port1); + var manager2 = startManager(port2); + + NettySender sender = manager1.channel(address(port2)).get(); + + TestMessage testMessage = new TestMessage(msgText, new HashMap<>()); + + manager2.stop(); + + final var finalSender = sender; Review comment: what's this for? ########## File path: modules/network/src/integrationTest/java/org/apache/ignite/network/ConnectionManagerTest.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.internal.netty.NettySender; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Tests for {@link ConnectionManager}. + */ +public class ConnectionManagerTest { + /** Started connection managers. */ + private final List<ConnectionManager> startedManagers = new ArrayList<>(); + + /** */ + @AfterEach + void tearDown() { + startedManagers.forEach(ConnectionManager::stop); + + startedManagers.clear(); + } + + /** + * Tests that a message is sent successfuly using ConnectionManager. + * + * @throws Exception If failed. + */ + @Test + public void testSentSuccessfully() throws Exception { + String msgText = "test"; + + var latch = new CountDownLatch(1); + + int port1 = 4000; + int port2 = 4001; + + var manager1 = startManager(port1); + var manager2 = startManager(port2); + + manager2.addListener((address, message) -> { + if (message instanceof TestMessage && msgText.equals(((TestMessage) message).msg())) Review comment: maybe it would be better to use a `CompletableFuture` instead of a latch, so that we can check the message using an assertion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
