ibessonov commented on code in PR #1780: URL: https://github.com/apache/ignite-3/pull/1780#discussion_r1141688164
########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelInfo.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.util.IgniteUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Data class with channel information. + * May be used as channel pointer in {@link MessagingService} for sending messages in exclusive channel. + */ +public final class ChannelInfo { + static { + Map<Short, ChannelInfo> tmpChannels = new ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE)); + tmpChannels.put((short) 0, new ChannelInfo((short) 0, "Default")); + + channels = tmpChannels; + } + + private static final Map<Short, ChannelInfo> channels; + + private static final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Channel identifier. + */ + private final short id; + + /** + * Channel name. + */ + private final String name; + + private ChannelInfo(short id, String name) { + this.id = id; + this.name = name; + } + + /** + * Channel identifier, must be unique for each implementation. + * + * @return Channel identifier. + */ + public short id() { + return id; + } + + /** + * Returns channel name. + * + * @return Channel name. + */ + public String name() { + return name; + } + + @Override + public int hashCode() { + return id(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ChannelInfo)) { + return false; + } + ChannelInfo type = (ChannelInfo) obj; + return Objects.equals(id(), type.id()); + } + + @Override + public String toString() { + return S.toString(this); + } + + /** + * Returns default channel info. + * + * @return Default channel info. + */ + public static ChannelInfo defaultChannel() { + lock.readLock().lock(); Review Comment: If it's a concurrent hash map, what's the reason for using explicit read lock? Another question - default channel info can be a constant, you don't need to access the map every time, unless you mock it in tests? ########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelInfo.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.util.IgniteUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Data class with channel information. + * May be used as channel pointer in {@link MessagingService} for sending messages in exclusive channel. + */ +public final class ChannelInfo { + static { + Map<Short, ChannelInfo> tmpChannels = new ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE)); + tmpChannels.put((short) 0, new ChannelInfo((short) 0, "Default")); + + channels = tmpChannels; + } + + private static final Map<Short, ChannelInfo> channels; + + private static final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Channel identifier. + */ + private final short id; + + /** + * Channel name. + */ + private final String name; + + private ChannelInfo(short id, String name) { + this.id = id; + this.name = name; + } + + /** + * Channel identifier, must be unique for each implementation. + * + * @return Channel identifier. + */ + public short id() { + return id; + } + + /** + * Returns channel name. + * + * @return Channel name. + */ + public String name() { + return name; + } + + @Override + public int hashCode() { + return id(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ChannelInfo)) { + return false; + } + ChannelInfo type = (ChannelInfo) obj; + return Objects.equals(id(), type.id()); + } + + @Override + public String toString() { + return S.toString(this); + } + + /** + * Returns default channel info. + * + * @return Default channel info. + */ + public static ChannelInfo defaultChannel() { + lock.readLock().lock(); + try { + return channels.get((short) 0); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Try to register channel with provided identifier. If identifier already used + * by another channel will throw {@link ChannelTypeAlreadyExist}. + * + * @param id Channel identifier. + * @param name Channel name. + * @return Register channel. + * @throws ChannelTypeAlreadyExist In case when channel identifier already used. + */ + public static ChannelInfo register(short id, String name) throws ChannelTypeAlreadyExist { + lock.writeLock().lock(); + try { + if (channels.get(id) == null) { Review Comment: There's a "putIfAbsent" or similar methods that you can use. Same question here - you already have concurrent map, what's the reason for the write lock? ########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java: ########## @@ -18,26 +18,16 @@ package org.apache.ignite.network; /** - * Channel type with unique identifier. + * Throws when register channel with already used identifier. */ -public enum ChannelType { - DEFAULT((short) 0), - DEPLOYMENT_UNITS((short) 1), - - TEST(Short.MAX_VALUE); - - private final short id; - - ChannelType(short id) { - this.id = id; - } - - public short id() { - return id; - } - - public static ChannelType fromId(short id) { - ChannelType[] values = values(); - return values[id]; +public class ChannelTypeAlreadyExist extends Exception { Review Comment: Just curious, what's the reason for checked exception here? Such exception should never occur in production environment, because it would mean a bug in the product, not an expected outcome of someones action. As far as I know, in Java we use runtime exceptions in such cases ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -87,6 +94,11 @@ final void tearDown() throws Exception { IgniteUtils.closeAll(startedManagers); } + @BeforeAll + static void registerChannel() { + testChannel = ChannelInfo.generate("ItConnectionManagerTest"); Review Comment: I liked the name `ChannelType` more. Right now I can't understand, what type of "info" it holds ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -302,57 +311,56 @@ public void testStopTwice() throws Exception { @Test public void testOneConnectionType() throws Exception { - int size = 10000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); + String bigText = IgniteTestUtils.randomString(new Random(), 10000000); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); - NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); - - CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(true)); - - server1.close(); - server2.close(); + assertThat(send2, willCompleteSuccessfully()); + assertTrue(send1.isDone()); + } } @Test public void testTwoConnectionTypes() throws Exception { - int size = 1000000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); - + String bigText = IgniteTestUtils.randomString(new Random(), 100000000); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + Map<Integer, String> map = Map.of(1, bigText, 2, bigText); - NettySender sender1 = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); - NettySender sender2 = server1.openChannelTo(server2, ChannelType.TEST).get(3, TimeUnit.SECONDS); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender1 = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); + NettySender sender2 = server1.openChannelTo(server2, testChannel).get(3, TimeUnit.SECONDS); - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).map(map).build(); - CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, Collections.emptyList())); + CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture.runAsync(() -> { Review Comment: You should wait for this future completion before moving to the next test ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -302,57 +311,56 @@ public void testStopTwice() throws Exception { @Test public void testOneConnectionType() throws Exception { - int size = 10000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); + String bigText = IgniteTestUtils.randomString(new Random(), 10000000); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); - NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); - - CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(true)); - - server1.close(); - server2.close(); + assertThat(send2, willCompleteSuccessfully()); + assertTrue(send1.isDone()); + } } @Test public void testTwoConnectionTypes() throws Exception { - int size = 1000000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); - + String bigText = IgniteTestUtils.randomString(new Random(), 100000000); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + Map<Integer, String> map = Map.of(1, bigText, 2, bigText); - NettySender sender1 = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); - NettySender sender2 = server1.openChannelTo(server2, ChannelType.TEST).get(3, TimeUnit.SECONDS); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender1 = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); + NettySender sender2 = server1.openChannelTo(server2, testChannel).get(3, TimeUnit.SECONDS); - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).map(map).build(); - CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, Collections.emptyList())); + CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture.runAsync(() -> { + for (int i = 0; i < 100; i++) { + sender2.send(new OutNetworkObject(messageFactory.emptyMessage().build(), Collections.emptyList())); + } + }); - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(false)); + AtomicBoolean atLeastOneSmallWas = new AtomicBoolean(false); + server2.connectionManager.addListener(inNetworkObject -> { Review Comment: There must be a method to listen for specific message group ########## modules/network-api/src/main/java/org/apache/ignite/network/ChannelInfo.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.network; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.util.IgniteUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Data class with channel information. + * May be used as channel pointer in {@link MessagingService} for sending messages in exclusive channel. + */ +public final class ChannelInfo { + static { + Map<Short, ChannelInfo> tmpChannels = new ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE)); + tmpChannels.put((short) 0, new ChannelInfo((short) 0, "Default")); + + channels = tmpChannels; + } + + private static final Map<Short, ChannelInfo> channels; + + private static final ReadWriteLock lock = new ReentrantReadWriteLock(); + + /** + * Channel identifier. + */ + private final short id; + + /** + * Channel name. + */ + private final String name; + + private ChannelInfo(short id, String name) { + this.id = id; + this.name = name; + } + + /** + * Channel identifier, must be unique for each implementation. + * + * @return Channel identifier. + */ + public short id() { + return id; + } + + /** + * Returns channel name. + * + * @return Channel name. + */ + public String name() { + return name; + } + + @Override + public int hashCode() { + return id(); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ChannelInfo)) { + return false; + } + ChannelInfo type = (ChannelInfo) obj; + return Objects.equals(id(), type.id()); + } + + @Override + public String toString() { + return S.toString(this); + } + + /** + * Returns default channel info. + * + * @return Default channel info. + */ + public static ChannelInfo defaultChannel() { + lock.readLock().lock(); + try { + return channels.get((short) 0); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Try to register channel with provided identifier. If identifier already used + * by another channel will throw {@link ChannelTypeAlreadyExist}. + * + * @param id Channel identifier. + * @param name Channel name. + * @return Register channel. + * @throws ChannelTypeAlreadyExist In case when channel identifier already used. + */ + public static ChannelInfo register(short id, String name) throws ChannelTypeAlreadyExist { + lock.writeLock().lock(); + try { + if (channels.get(id) == null) { + ChannelInfo result = new ChannelInfo(id, name); + channels.put(id, result); + return result; + } + + throw new ChannelTypeAlreadyExist(id, name); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Returns channel with provided identifier or + * {@code null} if channel with id doesn't registered yet. + * + * @param id Channel identifier. + * @return Channel with provided identifier or {@code null} if channel with id doesn't registered yet. + */ + public static ChannelInfo getChannel(short id) { + lock.readLock().lock(); + try { + return channels.get(id); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Generate and return channel with free identifier or {@code null} if all identifiers already used. + * + * @param name Channel name. + * @return Channel with free identifier or {@code null} if all identifiers already used. + */ + public static @Nullable ChannelInfo generate(String name) { + lock.writeLock().lock(); + try { + for (short i = 0; i < Short.MAX_VALUE; i++) { Review Comment: How can you guarantee that different Ignite nodes will produce same constants? Seems luck-dependent. I'd recommend only using `register` and expect that each module will somehow choose the unique constant. We already use such approach in message group ids, and in page memory page IO types (not as widely adopted, for obvious reasons) ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -302,57 +311,56 @@ public void testStopTwice() throws Exception { @Test public void testOneConnectionType() throws Exception { - int size = 10000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); + String bigText = IgniteTestUtils.randomString(new Random(), 10000000); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); - NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); - - CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(true)); - - server1.close(); - server2.close(); + assertThat(send2, willCompleteSuccessfully()); + assertTrue(send1.isDone()); + } } @Test public void testTwoConnectionTypes() throws Exception { - int size = 1000000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); - + String bigText = IgniteTestUtils.randomString(new Random(), 100000000); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + Map<Integer, String> map = Map.of(1, bigText, 2, bigText); - NettySender sender1 = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); - NettySender sender2 = server1.openChannelTo(server2, ChannelType.TEST).get(3, TimeUnit.SECONDS); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender1 = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); + NettySender sender2 = server1.openChannelTo(server2, testChannel).get(3, TimeUnit.SECONDS); - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).map(map).build(); - CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, Collections.emptyList())); + CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture.runAsync(() -> { + for (int i = 0; i < 100; i++) { + sender2.send(new OutNetworkObject(messageFactory.emptyMessage().build(), Collections.emptyList())); + } + }); - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(false)); + AtomicBoolean atLeastOneSmallWas = new AtomicBoolean(false); + server2.connectionManager.addListener(inNetworkObject -> { + System.out.println(inNetworkObject.message().groupType()); + if (inNetworkObject.message().groupType() == EmptyMessageImpl.GROUP_TYPE + && inNetworkObject.message().messageType() == EmptyMessageImpl.TYPE) { + atLeastOneSmallWas.set(true); + } + }); - server1.close(); - server2.close(); + assertThat(send1, willCompleteSuccessfully()); + assertTrue(atLeastOneSmallWas.get()); Review Comment: I don't understand these assertions. We check that by the time we received big message, at least one small message is received as well? I think you could also add an assertion that be the time one of the small messages is received, big message is still sending. Anyway, all of this still feels luck-based, was it really the only way to write the test? If there are no other options, I guess I'll approve it, but generally - I don't like this approach. Another question - why have you decided to put these tests into "integrationTest" directory? Aren't these supposed to be unit tests? ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -302,57 +311,56 @@ public void testStopTwice() throws Exception { @Test public void testOneConnectionType() throws Exception { - int size = 10000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); + String bigText = IgniteTestUtils.randomString(new Random(), 10000000); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); - NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); - - CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(true)); - - server1.close(); - server2.close(); + assertThat(send2, willCompleteSuccessfully()); + assertTrue(send1.isDone()); + } } @Test public void testTwoConnectionTypes() throws Exception { - int size = 1000000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); - + String bigText = IgniteTestUtils.randomString(new Random(), 100000000); Review Comment: I don't really like that we have a 100Mb message in test. Can you please document the test scenario, and justify why it has to be this big? ########## modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java: ########## @@ -302,57 +311,56 @@ public void testStopTwice() throws Exception { @Test public void testOneConnectionType() throws Exception { - int size = 10000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); + String bigText = IgniteTestUtils.randomString(new Random(), 10000000); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); + TestMessage msg = messageFactory.testMessage().msg("test").build(); - NettySender sender = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); + CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); - - CompletableFuture<Void> send1 = sender.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, Collections.emptyList())); - - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(true)); - - server1.close(); - server2.close(); + assertThat(send2, willCompleteSuccessfully()); + assertTrue(send1.isDone()); + } } @Test public void testTwoConnectionTypes() throws Exception { - int size = 1000000000; - char[] chars = new char[size]; - Arrays.fill(chars, 'a'); - String bigText = new String(chars); - + String bigText = IgniteTestUtils.randomString(new Random(), 100000000); - ConnectionManagerWrapper server1 = startManager(4000); - ConnectionManagerWrapper server2 = startManager(4001); + Map<Integer, String> map = Map.of(1, bigText, 2, bigText); - NettySender sender1 = server1.openChannelTo(server2, ChannelType.DEFAULT).get(3, TimeUnit.SECONDS); - NettySender sender2 = server1.openChannelTo(server2, ChannelType.TEST).get(3, TimeUnit.SECONDS); + try (ConnectionManagerWrapper server1 = startManager(4000); + ConnectionManagerWrapper server2 = startManager(4001)) { + NettySender sender1 = server1.openChannelTo(server2).get(3, TimeUnit.SECONDS); + NettySender sender2 = server1.openChannelTo(server2, testChannel).get(3, TimeUnit.SECONDS); - TestMessage bigMessage = messageFactory.testMessage().msg(bigText).build(); - TestMessage msg = messageFactory.testMessage().msg("test").build(); + TestMessage bigMessage = messageFactory.testMessage().msg(bigText).map(map).build(); - CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); - CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, Collections.emptyList())); + CompletableFuture<Void> send1 = sender1.send(new OutNetworkObject(bigMessage, Collections.emptyList())); + CompletableFuture.runAsync(() -> { + for (int i = 0; i < 100; i++) { + sender2.send(new OutNetworkObject(messageFactory.emptyMessage().build(), Collections.emptyList())); + } + }); - assertThat(send2, willCompleteSuccessfully()); - assertThat(send1.isDone(), is(false)); + AtomicBoolean atLeastOneSmallWas = new AtomicBoolean(false); + server2.connectionManager.addListener(inNetworkObject -> { + System.out.println(inNetworkObject.message().groupType()); Review Comment: I think you forgot to delete this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
