ibessonov commented on code in PR #1780: URL: https://github.com/apache/ignite-3/pull/1780#discussion_r1138589573
########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +/** + * Channel type with unique identifier. + */ +public enum ChannelType { Review Comment: There's not enough documentation here ########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +/** + * Channel type with unique identifier. + */ +public enum ChannelType { + DEFAULT((short) 0), + DEPLOYMENT_UNITS((short) 1), + + TEST(Short.MAX_VALUE); Review Comment: This is what I call an abstraction leak. Please invent something else, we can't make API dependent on the implementation ########## modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java: ########## @@ -63,7 +67,11 @@ public interface MessagingService { * @param correlationId Correlation id when replying to the request. * @return Future of the send operation. */ - CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId); + default CompletableFuture<Void> respond(ClusterNode recipient, NetworkMessage msg, long correlationId) { + return respond(recipient, ChannelType.DEFAULT, msg, correlationId); + } + + CompletableFuture<Void> respond(ClusterNode recipient, ChannelType channelType, NetworkMessage msg, long correlationId); Review Comment: Same here and later in the file ########## modules/network-api/src/main/java/org/apache/ignite/network/MessagingService.java: ########## @@ -52,7 +52,11 @@ public interface MessagingService { * @param msg Message which should be delivered. * @return Future of the send operation. */ - CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg); + default CompletableFuture<Void> send(ClusterNode recipient, NetworkMessage msg) { + return send(recipient, ChannelType.DEFAULT, msg); + } + + CompletableFuture<Void> send(ClusterNode recipient, ChannelType type, NetworkMessage msg); Review Comment: Public methods in api modules should be properly documented ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -296,6 +300,61 @@ public void testStopTwice() throws Exception { server.close(); } + @Test + public void testOneConnectionType() throws Exception { + int size = 10000000; + char[] chars = new char[size]; + Arrays.fill(chars, 'a'); + String bigText = new String(chars); Review Comment: You could use `IgniteTestUtils#randomString` ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -296,6 +300,61 @@ public void testStopTwice() throws Exception { server.close(); } + @Test + public void testOneConnectionType() throws Exception { + int size = 10000000; + char[] chars = new char[size]; + Arrays.fill(chars, 'a'); + String bigText = new String(chars); + + + ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001); + + NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + + + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); + + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); + + assertThat(send2, willCompleteSuccessfully()); + assertThat(send1.isDone(), is(true)); + + server1.close(); Review Comment: Close must be done in `finally` block. Why don't you use try-with-resources? And just in case, there's a syntax for multiple resources in the same try block, if you're not aware of that. ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -296,6 +300,61 @@ public void testStopTwice() throws Exception { server.close(); } + @Test + public void testOneConnectionType() throws Exception { + int size = 10000000; + char[] chars = new char[size]; + Arrays.fill(chars, 'a'); + String bigText = new String(chars); + Review Comment: We usually separate blocks by one line, not two ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -296,6 +300,61 @@ public void testStopTwice() throws Exception { server.close(); } + @Test + public void testOneConnectionType() throws Exception { + int size = 10000000; + char[] chars = new char[size]; + Arrays.fill(chars, 'a'); + String bigText = new String(chars); + + + ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001); + + NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + + + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); + + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); + + assertThat(send2, willCompleteSuccessfully()); + assertThat(send1.isDone(), is(true)); + + server1.close(); + server2.close(); + } + + @Test + public void testTwoConnectionTypes() throws Exception { + int size = 1000000000; + char[] chars = new char[size]; + Arrays.fill(chars, 'a'); + String bigText = new String(chars); + + + ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001); + + NettySender sender1 = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + NettySender sender2 = server1.openChannelTo(server2, ChannelType.TEST).get(3, TimeUnit.SECONDS); + + + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); + + CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, Collections.emptyList())); + + assertThat(send2, willCompleteSuccessfully()); + assertThat(send1.isDone(), is(false)); Review Comment: No, this test will fail sooner or later, can we have something more predictable? Making the test, that's based purely on the time that it takes to send certain amount of bytes, is not robust enough. ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -296,6 +300,61 @@ public void testStopTwice() throws Exception { server.close(); } + @Test + public void testOneConnectionType() throws Exception { + int size = 10000000; + char[] chars = new char[size]; + Arrays.fill(chars, 'a'); + String bigText = new String(chars); + + + ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001); + + NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + + + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); + + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); + + assertThat(send2, willCompleteSuccessfully()); + assertThat(send1.isDone(), is(true)); Review Comment: It's interesting, why don't you use `assertTrue`? For code consistency? I'm ok with current assertion btw, just find it a bit mouthful ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -253,7 +254,8 @@ private void onMessage(InNetworkObject message) { * @param channel Channel from client to this {@link #server}. */ private void onNewIncomingChannel(NettySender channel) { - NettySender oldChannel = channels.put(channel.consistentId(), channel); + ConnectorKey<String> key = new ConnectorKey<>(channel.consistentId(), fromId(channel.channelId())); Review Comment: What happens if id is out of range? I guess we should fallback on the default type locally, writing warning in the process. What do you think? ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/ConnectionManager.java: ########## @@ -134,20 +136,20 @@ public ConnectionManager( * @param launchId Launch id of this node. * @param consistentId Consistent id of this node. * @param bootstrapFactory Bootstrap factory. - * @param clientHandhakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances. + * @param clientHandshakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances. Review Comment: ```suggestion * @param clientHandshakeManagerFactory Factory for {@link RecoveryClientHandshakeManager} instances. ``` ########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +/** + * Channel type with unique identifier. + */ +public enum ChannelType { + DEFAULT((short) 0), + DEPLOYMENT_UNITS((short) 1), + + TEST(Short.MAX_VALUE); + + private final short id; + + ChannelType(short id) { + this.id = id; + } + + public short id() { + return id; + } + + public static ChannelType fromId(short id) { + ChannelType[] values = values(); + return values[id]; Review Comment: No out-of-range check, no documentation. ########## modules/network/src/test/java/org/apache/ignite/network/DefaultMessagingServiceTest.java: ########## @@ -182,10 +182,12 @@ private Services createMessagingService(ClusterNode node, NetworkConfiguration n return new Services(connectionManager, messagingService); } - private static RecoveryClientHandhakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) { - return new RecoveryClientHandhakeManagerFactory() { + private static RecoveryClientHandshakeManagerFactory clientHandshakeManagerFactoryAdding(Runnable beforeHandshake) { + return new RecoveryClientHandshakeManagerFactory() { @Override - public RecoveryClientHandshakeManager create(UUID launchId, String consistentId, short connectionId, + public RecoveryClientHandshakeManager create(UUID launchId, Review Comment: Please move `UUID launchId,` on a separate line as well ########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelType.java: ########## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +/** + * Channel type with unique identifier. + */ +public enum ChannelType { + DEFAULT((short) 0), Review Comment: Should we have additional channel for scalecube messages, that's not available publicly? Maybe we should do that in a separate JIRA. ########## modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java: ########## @@ -80,6 +84,10 @@ public String consistentId() { return consistentId; } + public short channelId() { Review Comment: Please add documentation, class would look more uniform this way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
