sashapolo commented on a change in pull request #70: URL: https://github.com/apache/ignite-3/pull/70#discussion_r598616181
########## File path: modules/network/src/main/java/org/apache/ignite/network/Network.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.util.HashMap; +import java.util.Map; + +/** + * Entry point for network module. + */ +public class Network { + /** Message mappers map, message type -> message mapper. */ + private final Map<Short, MessageMapper> messageMappers = new HashMap<>(); Review comment: I can see that you are using raw types for `MessageMapper`s a lot. Moreover, the actual generic type is not needed anywhere. I think this is a strong indication that `MessageMapper` should not be a generic interface ########## File path: modules/network/src/main/java/org/apache/ignite/network/Network.java ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.util.HashMap; +import java.util.Map; + +/** + * Entry point for network module. + */ +public class Network { + /** Message mappers map, message type -> message mapper. */ + private final Map<Short, MessageMapper> messageMappers = new HashMap<>(); + + /** Message handlers. */ + private final MessageHandlerHolder messageHandlerHolder = new MessageHandlerHolder(); + + /** Cluster factory. */ + private final NetworkClusterFactory clusterFactory; + + /** + * Constructor. + * @param factory Cluster factory. + */ + public Network(NetworkClusterFactory factory) { + clusterFactory = factory; + } + + /** + * Register message mapper by message type. + * @param type Message type. + * @param messageMapper Message mapper. + */ + public void registerMessageMapper(short type, MessageMapper messageMapper) { + this.messageMappers.put(type, messageMapper); + } + + /** + * Start new cluster. + * @return Network cluster. + */ + public NetworkCluster start() { + NetworkClusterContext context = new NetworkClusterContext(messageHandlerHolder, messageMappers); Review comment: Maybe it would be cleaner to store the context as a field instead of having two fields? ########## File path: modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java ########## @@ -60,10 +60,10 @@ * @param member Network member which should receive the message. * @param msg Message which should be delivered. */ - Future<?> send(NetworkMember member, Object msg); + Future<?> send(NetworkMember member, Request<AckResponse> msg); /** - * Sends asynchronously a message with same guarantees as for {@link #send(NetworkMember, Object)} and + * Sends a message asynchronously with same guarantees as for {@link #send(NetworkMember, NetworkMessage)} and Review comment: `send` link is incorrect ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageCodec.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.MessageCodec; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.network.MessageMapper; +import org.apache.ignite.network.MessageMappingException; +import org.apache.ignite.network.NetworkMessage; + +/** + * Serializes and deserialized messages in ScaleCube cluster. + */ +class ScaleCubeMessageCodec implements MessageCodec { + /** Header name for {@link NetworkMessage#type()}. */ + public static final String HEADER_MESSAGE_TYPE = "type"; + + /** Map message type -> {@link MessageMapper} */ + private final Map<Short, MessageMapper> messageMapperMap; + + /** + * Constructor. + * @param map Message mapper map. + */ + ScaleCubeMessageCodec(Map<Short, MessageMapper> map) { + messageMapperMap = map; + } + + /** {@inheritDoc} */ + @Override public Message deserialize(InputStream stream) throws Exception { + Message.Builder builder = Message.builder(); + try (ObjectInputStream ois = new ObjectInputStream(stream)) { + // headers + int headersSize = ois.readInt(); + Map<String, String> headers = new HashMap<>(headersSize); + for (int i = 0; i < headersSize; i++) { + String name = ois.readUTF(); + String value = (String) ois.readObject(); + headers.put(name, value); + } + + builder.headers(headers); + + String typeString = headers.get(HEADER_MESSAGE_TYPE); + + if (typeString == null) { + builder.data(ois.readObject()); + return builder.build(); + } + + short type; + try { + type = Short.parseShort(typeString); + } + catch (NumberFormatException e) { + throw new MessageMappingException("Type is not short", e); + } + + MessageMapper mapper = messageMapperMap.get(type); + + assert mapper != null : "No mapper defined for type " + type; + + NetworkMessage message = mapper.readMessage(ois); + builder.data(message); + } + return builder.build(); + } + + /** {@inheritDoc} */ + @Override public void serialize(Message message, OutputStream stream) throws Exception { + final Object data = message.data(); + + if (!(data instanceof NetworkMessage)) { + try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { Review comment: are you sure that you need to close the stream that got passed to this method? ########## File path: modules/network/src/main/java/org/apache/ignite/network/NetworkClusterContext.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.util.Map; + +/** + * Cluster context. + */ +public class NetworkClusterContext { + /** Message handlers. */ + private final MessageHandlerHolder messageHandlerHolder; + + /** Message mappers, message type -> message mapper. */ + private final Map<Short, MessageMapper> messageMappers; + + /** + * Constructor. + * @param messageHandlerHolder Message handlers. + * @param messageMappers Message mappers map. + */ + public NetworkClusterContext(MessageHandlerHolder messageHandlerHolder, Map<Short, MessageMapper> messageMappers) { + this.messageHandlerHolder = messageHandlerHolder; + this.messageMappers = messageMappers; + } + + /** + * @return Message handlers. + */ + public MessageHandlerHolder messageHandlerHolder() { + return messageHandlerHolder; + } + + /** + * @return Message mappers map. + */ + public Map<Short, MessageMapper> messageMappers() { + return messageMappers; Review comment: Should we wrap this in an `unmodifiableMap`? ########## File path: modules/network/src/main/java/org/apache/ignite/network/NetworkCluster.java ########## @@ -60,10 +60,10 @@ * @param member Network member which should receive the message. * @param msg Message which should be delivered. */ - Future<?> send(NetworkMember member, Object msg); + Future<?> send(NetworkMember member, Request<AckResponse> msg); Review comment: why not `Future<Void>`? ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.Cluster; +import io.scalecube.cluster.ClusterImpl; +import io.scalecube.net.Address; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.ignite.network.MessageHandlerHolder; +import org.apache.ignite.network.NetworkCluster; +import org.apache.ignite.network.NetworkClusterContext; +import org.apache.ignite.network.NetworkClusterFactory; + +/** + * Factory for ScaleCubeNetworkCluster. + */ +public class ScaleCubeNetworkClusterFactory implements NetworkClusterFactory { + /** Unique name of network member. */ + private final String localMemberName; + + /** Local port. */ + private final int localPort; + + /** Network addresses to find another members in cluster. */ + private final List<String> addresses; + + /** + * Member resolver which allows convert {@link org.apache.ignite.network.NetworkMember} to inner ScaleCube type + * and otherwise. + */ + private final ScaleCubeMemberResolver memberResolver; + + /** + * @param localMemberName Unique name of network member. + * @param port Local port. + * @param addresses Network addresses to find another members in cluster. + */ + public ScaleCubeNetworkClusterFactory( + String localMemberName, + int port, + List<String> addresses, + ScaleCubeMemberResolver memberResolver + ) { + this.localMemberName = localMemberName; + localPort = port; + this.addresses = addresses; + this.memberResolver = memberResolver; + } + + /** + * Implementation of {@link NetworkCluster} based on ScaleCube. Review comment: This javadoc is incorrect ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.Cluster; +import io.scalecube.cluster.ClusterImpl; +import io.scalecube.net.Address; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.ignite.network.MessageHandlerHolder; +import org.apache.ignite.network.NetworkCluster; +import org.apache.ignite.network.NetworkClusterContext; +import org.apache.ignite.network.NetworkClusterFactory; + +/** + * Factory for ScaleCubeNetworkCluster. + */ +public class ScaleCubeNetworkClusterFactory implements NetworkClusterFactory { + /** Unique name of network member. */ + private final String localMemberName; + + /** Local port. */ + private final int localPort; + + /** Network addresses to find another members in cluster. */ + private final List<String> addresses; + + /** + * Member resolver which allows convert {@link org.apache.ignite.network.NetworkMember} to inner ScaleCube type + * and otherwise. + */ + private final ScaleCubeMemberResolver memberResolver; + + /** + * @param localMemberName Unique name of network member. + * @param port Local port. + * @param addresses Network addresses to find another members in cluster. + */ + public ScaleCubeNetworkClusterFactory( + String localMemberName, + int port, + List<String> addresses, + ScaleCubeMemberResolver memberResolver + ) { + this.localMemberName = localMemberName; + localPort = port; + this.addresses = addresses; + this.memberResolver = memberResolver; + } + + /** + * Implementation of {@link NetworkCluster} based on ScaleCube. + * + * @param memberResolver Member resolve which allows convert {@link org.apache.ignite.network.NetworkMember} to + * inner ScaleCube type and otherwise. + * @param messageHandlerHolder Holder of all cluster message handlers. + * @return {@link NetworkCluster} instance. + */ + @Override public NetworkCluster startCluster(NetworkClusterContext clusterContext) { + final MessageHandlerHolder handlerHolder = clusterContext.messageHandlerHolder(); + Cluster cluster = new ClusterImpl() + .handler(cl -> { + return new ScaleCubeMessageHandler(cl, memberResolver, handlerHolder); + }) + .config(opts -> opts + .memberAlias(localMemberName) + .transport(trans -> { + return trans.port(localPort) + .connectTimeout(1000_000_000) Review comment: I think this value should be a named constant or a configurable parameter ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeMessageCodec.java ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.transport.api.Message; +import io.scalecube.cluster.transport.api.MessageCodec; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import org.apache.ignite.network.MessageMapper; +import org.apache.ignite.network.MessageMappingException; +import org.apache.ignite.network.NetworkMessage; + +/** + * Serializes and deserialized messages in ScaleCube cluster. + */ +class ScaleCubeMessageCodec implements MessageCodec { + /** Header name for {@link NetworkMessage#type()}. */ + public static final String HEADER_MESSAGE_TYPE = "type"; + + /** Map message type -> {@link MessageMapper} */ + private final Map<Short, MessageMapper> messageMapperMap; + + /** + * Constructor. + * @param map Message mapper map. + */ + ScaleCubeMessageCodec(Map<Short, MessageMapper> map) { + messageMapperMap = map; + } + + /** {@inheritDoc} */ + @Override public Message deserialize(InputStream stream) throws Exception { + Message.Builder builder = Message.builder(); + try (ObjectInputStream ois = new ObjectInputStream(stream)) { + // headers + int headersSize = ois.readInt(); + Map<String, String> headers = new HashMap<>(headersSize); + for (int i = 0; i < headersSize; i++) { + String name = ois.readUTF(); + String value = (String) ois.readObject(); + headers.put(name, value); + } + + builder.headers(headers); + + String typeString = headers.get(HEADER_MESSAGE_TYPE); + + if (typeString == null) { + builder.data(ois.readObject()); + return builder.build(); + } + + short type; + try { + type = Short.parseShort(typeString); + } + catch (NumberFormatException e) { + throw new MessageMappingException("Type is not short", e); + } + + MessageMapper mapper = messageMapperMap.get(type); + + assert mapper != null : "No mapper defined for type " + type; + + NetworkMessage message = mapper.readMessage(ois); + builder.data(message); + } + return builder.build(); + } + + /** {@inheritDoc} */ + @Override public void serialize(Message message, OutputStream stream) throws Exception { + final Object data = message.data(); + + if (!(data instanceof NetworkMessage)) { + try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { + message.writeExternal(oos); + } + return; + } + + Map<String, String> headers = message.headers(); + + assert headers.containsKey(HEADER_MESSAGE_TYPE) : "Missing message type header"; + + try (ObjectOutputStream oos = new ObjectOutputStream(stream)) { Review comment: same here ########## File path: modules/network/src/main/java/org/apache/ignite/network/scalecube/ScaleCubeNetworkClusterFactory.java ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network.scalecube; + +import io.scalecube.cluster.Cluster; +import io.scalecube.cluster.ClusterImpl; +import io.scalecube.net.Address; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.ignite.network.MessageHandlerHolder; +import org.apache.ignite.network.NetworkCluster; +import org.apache.ignite.network.NetworkClusterContext; +import org.apache.ignite.network.NetworkClusterFactory; + +/** + * Factory for ScaleCubeNetworkCluster. + */ +public class ScaleCubeNetworkClusterFactory implements NetworkClusterFactory { + /** Unique name of network member. */ + private final String localMemberName; + + /** Local port. */ + private final int localPort; + + /** Network addresses to find another members in cluster. */ + private final List<String> addresses; + + /** + * Member resolver which allows convert {@link org.apache.ignite.network.NetworkMember} to inner ScaleCube type + * and otherwise. + */ + private final ScaleCubeMemberResolver memberResolver; + + /** + * @param localMemberName Unique name of network member. + * @param port Local port. + * @param addresses Network addresses to find another members in cluster. + */ + public ScaleCubeNetworkClusterFactory( + String localMemberName, + int port, + List<String> addresses, + ScaleCubeMemberResolver memberResolver + ) { + this.localMemberName = localMemberName; + localPort = port; + this.addresses = addresses; + this.memberResolver = memberResolver; + } + + /** + * Implementation of {@link NetworkCluster} based on ScaleCube. + * + * @param memberResolver Member resolve which allows convert {@link org.apache.ignite.network.NetworkMember} to + * inner ScaleCube type and otherwise. + * @param messageHandlerHolder Holder of all cluster message handlers. + * @return {@link NetworkCluster} instance. + */ + @Override public NetworkCluster startCluster(NetworkClusterContext clusterContext) { + final MessageHandlerHolder handlerHolder = clusterContext.messageHandlerHolder(); + Cluster cluster = new ClusterImpl() + .handler(cl -> { + return new ScaleCubeMessageHandler(cl, memberResolver, handlerHolder); + }) + .config(opts -> opts + .memberAlias(localMemberName) + .transport(trans -> { + return trans.port(localPort) + .connectTimeout(1000_000_000) + .messageCodec(new ScaleCubeMessageCodec(clusterContext.messageMappers())); + }) + ) + .membership(opts -> opts.seedMembers(addresses.stream().map(Address::from).collect(Collectors.toList()))) Review comment: I think this address conversion should happen in the constructor so that we can be informed about malformed addresses earlier -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
