SammyVimes commented on a change in pull request #102: URL: https://github.com/apache/ignite-3/pull/102#discussion_r619694520
########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/NettyServer.java ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * Netty server channel wrapper. + */ +public class NettyServer { + /** {@link ServerSocketChannel} bootstrapper. */ + private final ServerBootstrap bootstrap = new ServerBootstrap(); + + /** Socket accepter event loop group. */ + private final NioEventLoopGroup bossGroup = new NioEventLoopGroup(); + + /** Socket handler event loop group. */ + private final NioEventLoopGroup workerGroup = new NioEventLoopGroup(); + + /** Server port. */ + private final int port; + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** Incoming message listener. */ + private final BiConsumer<InetSocketAddress, NetworkMessage> messageListener; + + /** Server socket channel. */ + private ServerSocketChannel channel; + + /** New connections listener. */ + private final Consumer<SocketChannel> newConnectionListener; + + /** + * Constructor. + * + * @param port Server port. + * @param newConnectionListener New connections listener. + * @param messageListener Message listener. + * @param serializationRegistry Serialization registry. + */ + public NettyServer( + int port, + Consumer<SocketChannel> newConnectionListener, + BiConsumer<InetSocketAddress, NetworkMessage> messageListener, + MessageSerializationRegistry serializationRegistry + ) { + this.port = port; + this.newConnectionListener = newConnectionListener; + this.messageListener = messageListener; + this.serializationRegistry = serializationRegistry; + } + + /** + * Start server. + * + * @return Future that resolves when server is successfuly started. + */ + public CompletableFuture<Void> start() { + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer<SocketChannel>() { + /** {@inheritDoc} */ + @Override public void initChannel(SocketChannel ch) + throws Exception { + ch.pipeline().addLast( + /** + * Decoder that uses {@link org.apache.ignite.network.internal.MessageReader} + * to read chunked data. + */ + new InboundDecoder(serializationRegistry), + /** Handles decoded {@link NetworkMessage}s. */ + new MessageHandler(messageListener), + /** + * Encoder that uses {@link org.apache.ignite.network.internal.MessageWriter} + * to write chunked data. + */ + new ChunkedWriteHandler() + ); + + newConnectionListener.accept(ch); + } + }) + /** + * The maximum queue length for incoming connection indications (a request to connect) is set + * to the backlog parameter. If a connection indication arrives when the queue is full, + * the connection is refused. + */ + .option(ChannelOption.SO_BACKLOG, 128) + /** + * When the keepalive option is set for a TCP socket and no data has been exchanged across the socket + * in either direction for 2 hours (NOTE: the actual value is implementation dependent), + * TCP automatically sends a keepalive probe to the peer. + */ + .childOption(ChannelOption.SO_KEEPALIVE, true); + + CompletableFuture<Void> serverStartFuture = new CompletableFuture<>(); + + ChannelFuture bindFuture = bootstrap.bind(port); + + bindFuture.addListener(bind -> { + this.channel = (ServerSocketChannel) bindFuture.channel(); + + if (bind.isSuccess()) + serverStartFuture.complete(null); + else { + Throwable cause = bind.cause(); + serverStartFuture.completeExceptionally(cause); + } + + // Shutdown event loops on server stop. + channel.closeFuture().addListener(close -> { + workerGroup.shutdownGracefully(); Review comment: I don't think so, as it happens in a listener and there is no place (and need) to actually wait for the event loop to shutdown ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.network.internal.MessageReader; +import org.apache.ignite.network.internal.direct.DirectMessageReader; +import org.apache.ignite.network.message.MessageDeserializer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * + */ +public class InboundDecoder extends ByteToMessageDecoder { + /** Message reader channel attribute key. */ + private static final AttributeKey<MessageReader> READER_KEY = AttributeKey.valueOf("READER"); + + /** Message deserializer channel attribute key. */ + private static final AttributeKey<MessageDeserializer<NetworkMessage>> DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER"); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** + * Constructor. + * + * @param serializationRegistry Serialization registry. + */ + public InboundDecoder(MessageSerializationRegistry serializationRegistry) { + this.serializationRegistry = serializationRegistry; + } + + /** {@inheritDoc} */ + @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + ByteBuffer buffer = in.nioBuffer(); + + Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY); + MessageReader reader = readerAttr.get(); + + if (reader == null) + readerAttr.set(reader = new DirectMessageReader(serializationRegistry, (byte) 1)); Review comment: I think there isn't, but no harm in changing it, sure ########## File path: modules/network/src/main/java/org/apache/ignite/network/internal/netty/InboundDecoder.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.internal.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.ignite.network.internal.MessageReader; +import org.apache.ignite.network.internal.direct.DirectMessageReader; +import org.apache.ignite.network.message.MessageDeserializer; +import org.apache.ignite.network.message.MessageSerializationRegistry; +import org.apache.ignite.network.message.NetworkMessage; + +/** + * + */ +public class InboundDecoder extends ByteToMessageDecoder { + /** Message reader channel attribute key. */ + private static final AttributeKey<MessageReader> READER_KEY = AttributeKey.valueOf("READER"); + + /** Message deserializer channel attribute key. */ + private static final AttributeKey<MessageDeserializer<NetworkMessage>> DESERIALIZER_KEY = AttributeKey.valueOf("DESERIALIZER"); + + /** Serialization registry. */ + private final MessageSerializationRegistry serializationRegistry; + + /** + * Constructor. + * + * @param serializationRegistry Serialization registry. + */ + public InboundDecoder(MessageSerializationRegistry serializationRegistry) { + this.serializationRegistry = serializationRegistry; + } + + /** {@inheritDoc} */ + @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { + ByteBuffer buffer = in.nioBuffer(); + + Attribute<MessageReader> readerAttr = ctx.channel().attr(READER_KEY); + MessageReader reader = readerAttr.get(); + + if (reader == null) + readerAttr.set(reader = new DirectMessageReader(serializationRegistry, (byte) 1)); Review comment: Sure, I'll move it to ConnectionManager (just like it was in GridIoManager in 2.x) ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/message/ScaleCubeMessageSerializationFactory.java ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube.message; + +import java.util.Map; +import org.apache.ignite.network.internal.MessageReader; +import org.apache.ignite.network.message.MessageDeserializer; +import org.apache.ignite.network.message.MessageMappingException; +import org.apache.ignite.network.message.MessageSerializationFactory; +import org.apache.ignite.network.message.MessageSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; + +public class ScaleCubeMessageSerializationFactory implements MessageSerializationFactory<ScaleCubeMessage> { + /** {@inheritDoc} */ + @Override public MessageDeserializer<ScaleCubeMessage> createDeserializer() { + return new MessageDeserializer<ScaleCubeMessage>() { + + ScaleCubeMessage obj; + + byte[] array; + + Map<String, String> headers; + + /** {@inheritDoc} */ + @Override public boolean readMessage(MessageReader reader) throws MessageMappingException { + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + array = reader.readByteArray("array"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + //noinspection fallthrough + case 1: + headers = reader.readMap("headers", MessageCollectionItemType.STRING, MessageCollectionItemType.STRING, false); Review comment: No, why? The order is lexicographic ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeDirectMarshallerTransport.java ########## @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.net.Address; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.Objects; +import org.apache.ignite.lang.IgniteInternalException; +import org.apache.ignite.network.internal.netty.ConnectionManager; +import org.apache.ignite.network.message.NetworkMessage; +import org.apache.ignite.network.scalecube.message.ScaleCubeMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.Disposable; +import reactor.core.Disposables; +import reactor.core.publisher.DirectProcessor; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +/** + * ScaleCube transport over {@link ConnectionManager}. + */ +public class ScaleCubeDirectMarshallerTransport implements Transport { + /** Logger. */ + private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class); + + /** Message subject. */ + private final DirectProcessor<Message> subject = DirectProcessor.create(); + + /** Message sink. */ + private final FluxSink<Message> sink = subject.sink(); + + /** Close handler */ + private final MonoProcessor<Void> stop = MonoProcessor.create(); + + /** On stop. */ + private final MonoProcessor<Void> onStop = MonoProcessor.create(); + + /** Connection manager. */ + private final ConnectionManager connectionManager; + + /** Node address. */ + private final Address address; + + /** + * Constructor. + * + * @param connectionManager Connection manager. + */ + public ScaleCubeDirectMarshallerTransport(ConnectionManager connectionManager) { + this.connectionManager = connectionManager; + this.connectionManager.addListener(this::onMessage); + this.address = prepareAddress(connectionManager.getLocalAddress()); + // Setup cleanup + stop.then(doStop()) + .doFinally(s -> onStop.onComplete()) + .subscribe( + null, + ex -> LOGGER.warn("[{}][doStop] Exception occurred: {}", address, ex.toString()) + ); + } + + /** + * Convert {@link InetSocketAddress} to {@link Address}. + * + * @param addr Address. + * @return ScaleCube address. + */ + private static Address prepareAddress(InetSocketAddress addr) { + InetAddress address = addr.getAddress(); + + int port = addr.getPort(); + + if (address.isAnyLocalAddress()) Review comment: I'm not sure if it's suitable for IPv6 addresses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
