ibessonov commented on code in PR #1780:
URL: https://github.com/apache/ignite-3/pull/1780#discussion_r1141688164


##########
modules/network-api/src/main/java/org/apache/ignite/network/ChannelInfo.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class with channel information.
+ *      May be used as channel pointer in {@link MessagingService} for sending 
messages in exclusive channel.
+ */
+public final class ChannelInfo {
+    static {
+        Map<Short, ChannelInfo> tmpChannels = new 
ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE));
+        tmpChannels.put((short) 0, new ChannelInfo((short) 0, "Default"));
+
+        channels = tmpChannels;
+    }
+
+    private static final Map<Short, ChannelInfo> channels;
+
+    private static final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Channel identifier.
+     */
+    private final short id;
+
+    /**
+     * Channel name.
+     */
+    private final String name;
+
+    private ChannelInfo(short id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    /**
+     * Channel identifier, must be unique for each implementation.
+     *
+     * @return Channel identifier.
+     */
+    public short id() {
+        return id;
+    }
+
+    /**
+     * Returns channel name.
+     *
+     * @return Channel name.
+     */
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public int hashCode() {
+        return id();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ChannelInfo)) {
+            return false;
+        }
+        ChannelInfo type = (ChannelInfo) obj;
+        return Objects.equals(id(), type.id());
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
+    /**
+     * Returns default channel info.
+     *
+     * @return Default channel info.
+     */
+    public static ChannelInfo defaultChannel() {
+        lock.readLock().lock();

Review Comment:
   If it's a concurrent hash map, what's the reason for using explicit read 
lock?
   Another question - default channel info can be a constant, you don't need to 
access the map every time, unless you mock it in tests?



##########
modules/network-api/src/main/java/org/apache/ignite/network/ChannelInfo.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class with channel information.
+ *      May be used as channel pointer in {@link MessagingService} for sending 
messages in exclusive channel.
+ */
+public final class ChannelInfo {
+    static {
+        Map<Short, ChannelInfo> tmpChannels = new 
ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE));
+        tmpChannels.put((short) 0, new ChannelInfo((short) 0, "Default"));
+
+        channels = tmpChannels;
+    }
+
+    private static final Map<Short, ChannelInfo> channels;
+
+    private static final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Channel identifier.
+     */
+    private final short id;
+
+    /**
+     * Channel name.
+     */
+    private final String name;
+
+    private ChannelInfo(short id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    /**
+     * Channel identifier, must be unique for each implementation.
+     *
+     * @return Channel identifier.
+     */
+    public short id() {
+        return id;
+    }
+
+    /**
+     * Returns channel name.
+     *
+     * @return Channel name.
+     */
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public int hashCode() {
+        return id();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ChannelInfo)) {
+            return false;
+        }
+        ChannelInfo type = (ChannelInfo) obj;
+        return Objects.equals(id(), type.id());
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
+    /**
+     * Returns default channel info.
+     *
+     * @return Default channel info.
+     */
+    public static ChannelInfo defaultChannel() {
+        lock.readLock().lock();
+        try {
+            return channels.get((short) 0);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Try to register channel with provided identifier. If identifier already 
used
+     *    by another channel will throw {@link ChannelTypeAlreadyExist}.
+     *
+     * @param id Channel identifier.
+     * @param name Channel name.
+     * @return Register channel.
+     * @throws ChannelTypeAlreadyExist In case when channel identifier already 
used.
+     */
+    public static ChannelInfo register(short id, String name) throws 
ChannelTypeAlreadyExist {
+        lock.writeLock().lock();
+        try {
+            if (channels.get(id) == null) {

Review Comment:
   There's a "putIfAbsent" or similar methods that you can use.
   Same question here - you already have concurrent map, what's the reason for 
the write lock?



##########
modules/network-api/src/main/java/org/apache/ignite/network/ChannelTypeAlreadyExist.java:
##########
@@ -18,26 +18,16 @@
 package org.apache.ignite.network;
 
 /**
- * Channel type with unique identifier.
+ * Throws when register channel with already used identifier.
  */
-public enum ChannelType {
-    DEFAULT((short) 0),
-    DEPLOYMENT_UNITS((short) 1),
-
-    TEST(Short.MAX_VALUE);
-
-    private final short id;
-
-    ChannelType(short id) {
-        this.id = id;
-    }
-
-    public short id() {
-        return id;
-    }
-
-    public static ChannelType fromId(short id) {
-        ChannelType[] values = values();
-        return values[id];
+public class ChannelTypeAlreadyExist extends Exception {

Review Comment:
   Just curious, what's the reason for checked exception here?
   Such exception should never occur in production environment, because it 
would mean a bug in the product, not an expected outcome of someones action. As 
far as I know, in Java we use runtime exceptions in such cases



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java:
##########
@@ -87,6 +94,11 @@ final void tearDown() throws Exception {
         IgniteUtils.closeAll(startedManagers);
     }
 
+    @BeforeAll
+    static void registerChannel() {
+        testChannel = ChannelInfo.generate("ItConnectionManagerTest");

Review Comment:
   I liked the name `ChannelType` more. Right now I can't understand, what type 
of "info" it holds



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java:
##########
@@ -302,57 +311,56 @@ public void testStopTwice() throws Exception {
 
     @Test
     public void testOneConnectionType() throws Exception {
-        int size = 10000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
+        String bigText = IgniteTestUtils.randomString(new Random(), 10000000);
 
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
+            NettySender sender = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
+            TestMessage msg = messageFactory.testMessage().msg("test").build();
 
-        NettySender sender = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
+            CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture<Void> send2 = sender.send(new 
OutNetworkObject(msg, Collections.emptyList()));
 
-
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
-
-        CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, 
Collections.emptyList()));
-
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(true));
-
-        server1.close();
-        server2.close();
+            assertThat(send2, willCompleteSuccessfully());
+            assertTrue(send1.isDone());
+        }
     }
 
     @Test
     public void testTwoConnectionTypes() throws Exception {
-        int size = 1000000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
-
+        String bigText = IgniteTestUtils.randomString(new Random(), 100000000);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+        Map<Integer, String> map = Map.of(1, bigText, 2, bigText);
 
-        NettySender sender1 = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
-        NettySender sender2 = server1.openChannelTo(server2, 
ChannelType.TEST).get(3, TimeUnit.SECONDS);
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
 
+            NettySender sender1 = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
+            NettySender sender2 = server1.openChannelTo(server2, 
testChannel).get(3, TimeUnit.SECONDS);
 
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).map(map).build();
 
-        CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, 
Collections.emptyList()));
+            CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture.runAsync(() -> {

Review Comment:
   You should wait for this future completion before moving to the next test



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java:
##########
@@ -302,57 +311,56 @@ public void testStopTwice() throws Exception {
 
     @Test
     public void testOneConnectionType() throws Exception {
-        int size = 10000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
+        String bigText = IgniteTestUtils.randomString(new Random(), 10000000);
 
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
+            NettySender sender = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
+            TestMessage msg = messageFactory.testMessage().msg("test").build();
 
-        NettySender sender = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
+            CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture<Void> send2 = sender.send(new 
OutNetworkObject(msg, Collections.emptyList()));
 
-
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
-
-        CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, 
Collections.emptyList()));
-
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(true));
-
-        server1.close();
-        server2.close();
+            assertThat(send2, willCompleteSuccessfully());
+            assertTrue(send1.isDone());
+        }
     }
 
     @Test
     public void testTwoConnectionTypes() throws Exception {
-        int size = 1000000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
-
+        String bigText = IgniteTestUtils.randomString(new Random(), 100000000);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+        Map<Integer, String> map = Map.of(1, bigText, 2, bigText);
 
-        NettySender sender1 = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
-        NettySender sender2 = server1.openChannelTo(server2, 
ChannelType.TEST).get(3, TimeUnit.SECONDS);
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
 
+            NettySender sender1 = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
+            NettySender sender2 = server1.openChannelTo(server2, 
testChannel).get(3, TimeUnit.SECONDS);
 
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).map(map).build();
 
-        CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, 
Collections.emptyList()));
+            CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture.runAsync(() -> {
+                for (int i = 0; i < 100; i++) {
+                    sender2.send(new 
OutNetworkObject(messageFactory.emptyMessage().build(), 
Collections.emptyList()));
+                }
+            });
 
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(false));
+            AtomicBoolean atLeastOneSmallWas = new AtomicBoolean(false);
+            server2.connectionManager.addListener(inNetworkObject -> {

Review Comment:
   There must be a method to listen for specific message group



##########
modules/network-api/src/main/java/org/apache/ignite/network/ChannelInfo.java:
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.network;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Data class with channel information.
+ *      May be used as channel pointer in {@link MessagingService} for sending 
messages in exclusive channel.
+ */
+public final class ChannelInfo {
+    static {
+        Map<Short, ChannelInfo> tmpChannels = new 
ConcurrentHashMap<>(IgniteUtils.capacity(Short.MAX_VALUE));
+        tmpChannels.put((short) 0, new ChannelInfo((short) 0, "Default"));
+
+        channels = tmpChannels;
+    }
+
+    private static final Map<Short, ChannelInfo> channels;
+
+    private static final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    /**
+     * Channel identifier.
+     */
+    private final short id;
+
+    /**
+     * Channel name.
+     */
+    private final String name;
+
+    private ChannelInfo(short id, String name) {
+        this.id = id;
+        this.name = name;
+    }
+
+    /**
+     * Channel identifier, must be unique for each implementation.
+     *
+     * @return Channel identifier.
+     */
+    public short id() {
+        return id;
+    }
+
+    /**
+     * Returns channel name.
+     *
+     * @return Channel name.
+     */
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public int hashCode() {
+        return id();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof ChannelInfo)) {
+            return false;
+        }
+        ChannelInfo type = (ChannelInfo) obj;
+        return Objects.equals(id(), type.id());
+    }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
+
+    /**
+     * Returns default channel info.
+     *
+     * @return Default channel info.
+     */
+    public static ChannelInfo defaultChannel() {
+        lock.readLock().lock();
+        try {
+            return channels.get((short) 0);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Try to register channel with provided identifier. If identifier already 
used
+     *    by another channel will throw {@link ChannelTypeAlreadyExist}.
+     *
+     * @param id Channel identifier.
+     * @param name Channel name.
+     * @return Register channel.
+     * @throws ChannelTypeAlreadyExist In case when channel identifier already 
used.
+     */
+    public static ChannelInfo register(short id, String name) throws 
ChannelTypeAlreadyExist {
+        lock.writeLock().lock();
+        try {
+            if (channels.get(id) == null) {
+                ChannelInfo result = new ChannelInfo(id, name);
+                channels.put(id, result);
+                return result;
+            }
+
+            throw new ChannelTypeAlreadyExist(id, name);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Returns channel with provided identifier or
+     *      {@code null} if channel with id doesn't registered yet.
+     *
+     * @param id Channel identifier.
+     * @return Channel with provided identifier or {@code null} if channel 
with id doesn't registered yet.
+     */
+    public static ChannelInfo getChannel(short id) {
+        lock.readLock().lock();
+        try {
+            return channels.get(id);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Generate and return channel with free identifier or {@code null} if all 
identifiers already used.
+     *
+     * @param name Channel name.
+     * @return Channel with free identifier or {@code null} if all identifiers 
already used.
+     */
+    public static @Nullable ChannelInfo generate(String name) {
+        lock.writeLock().lock();
+        try {
+            for (short i = 0; i < Short.MAX_VALUE; i++) {

Review Comment:
   How can you guarantee that different Ignite nodes will produce same 
constants? Seems luck-dependent.
   I'd recommend only using `register` and expect that each module will somehow 
choose the unique constant.
   We already use such approach in message group ids, and in page memory page 
IO types (not as widely adopted, for obvious reasons)



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java:
##########
@@ -302,57 +311,56 @@ public void testStopTwice() throws Exception {
 
     @Test
     public void testOneConnectionType() throws Exception {
-        int size = 10000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
+        String bigText = IgniteTestUtils.randomString(new Random(), 10000000);
 
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
+            NettySender sender = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
+            TestMessage msg = messageFactory.testMessage().msg("test").build();
 
-        NettySender sender = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
+            CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture<Void> send2 = sender.send(new 
OutNetworkObject(msg, Collections.emptyList()));
 
-
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
-
-        CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, 
Collections.emptyList()));
-
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(true));
-
-        server1.close();
-        server2.close();
+            assertThat(send2, willCompleteSuccessfully());
+            assertTrue(send1.isDone());
+        }
     }
 
     @Test
     public void testTwoConnectionTypes() throws Exception {
-        int size = 1000000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
-
+        String bigText = IgniteTestUtils.randomString(new Random(), 100000000);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+        Map<Integer, String> map = Map.of(1, bigText, 2, bigText);
 
-        NettySender sender1 = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
-        NettySender sender2 = server1.openChannelTo(server2, 
ChannelType.TEST).get(3, TimeUnit.SECONDS);
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
 
+            NettySender sender1 = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
+            NettySender sender2 = server1.openChannelTo(server2, 
testChannel).get(3, TimeUnit.SECONDS);
 
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).map(map).build();
 
-        CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, 
Collections.emptyList()));
+            CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture.runAsync(() -> {
+                for (int i = 0; i < 100; i++) {
+                    sender2.send(new 
OutNetworkObject(messageFactory.emptyMessage().build(), 
Collections.emptyList()));
+                }
+            });
 
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(false));
+            AtomicBoolean atLeastOneSmallWas = new AtomicBoolean(false);
+            server2.connectionManager.addListener(inNetworkObject -> {
+                System.out.println(inNetworkObject.message().groupType());
+                if (inNetworkObject.message().groupType() == 
EmptyMessageImpl.GROUP_TYPE
+                        && inNetworkObject.message().messageType() == 
EmptyMessageImpl.TYPE) {
+                    atLeastOneSmallWas.set(true);
+                }
+            });
 
-        server1.close();
-        server2.close();
+            assertThat(send1, willCompleteSuccessfully());
+            assertTrue(atLeastOneSmallWas.get());

Review Comment:
   I don't understand these assertions. We check that by the time we received 
big message, at least one small message is received as well?
   I think you could also add an assertion that be the time one of the small 
messages is received, big message is still sending.
   Anyway, all of this still feels luck-based, was it really the only way to 
write the test? If there are no other options, I guess I'll approve it, but 
generally - I don't like this approach.
   Another question - why have you decided to put these tests into 
"integrationTest" directory? Aren't these supposed to be unit tests?



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java:
##########
@@ -302,57 +311,56 @@ public void testStopTwice() throws Exception {
 
     @Test
     public void testOneConnectionType() throws Exception {
-        int size = 10000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
+        String bigText = IgniteTestUtils.randomString(new Random(), 10000000);
 
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
+            NettySender sender = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
+            TestMessage msg = messageFactory.testMessage().msg("test").build();
 
-        NettySender sender = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
+            CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture<Void> send2 = sender.send(new 
OutNetworkObject(msg, Collections.emptyList()));
 
-
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
-
-        CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, 
Collections.emptyList()));
-
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(true));
-
-        server1.close();
-        server2.close();
+            assertThat(send2, willCompleteSuccessfully());
+            assertTrue(send1.isDone());
+        }
     }
 
     @Test
     public void testTwoConnectionTypes() throws Exception {
-        int size = 1000000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
-
+        String bigText = IgniteTestUtils.randomString(new Random(), 100000000);

Review Comment:
   I don't really like that we have a 100Mb message in test. Can you please 
document the test scenario, and justify why it has to be this big?



##########
modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java:
##########
@@ -302,57 +311,56 @@ public void testStopTwice() throws Exception {
 
     @Test
     public void testOneConnectionType() throws Exception {
-        int size = 10000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
+        String bigText = IgniteTestUtils.randomString(new Random(), 10000000);
 
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
+            NettySender sender = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
+            TestMessage msg = messageFactory.testMessage().msg("test").build();
 
-        NettySender sender = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
+            CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture<Void> send2 = sender.send(new 
OutNetworkObject(msg, Collections.emptyList()));
 
-
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
-
-        CompletableFuture<Void> send1 = sender.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender.send(new OutNetworkObject(msg, 
Collections.emptyList()));
-
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(true));
-
-        server1.close();
-        server2.close();
+            assertThat(send2, willCompleteSuccessfully());
+            assertTrue(send1.isDone());
+        }
     }
 
     @Test
     public void testTwoConnectionTypes() throws Exception {
-        int size = 1000000000;
-        char[] chars = new char[size];
-        Arrays.fill(chars, 'a');
-        String bigText = new String(chars);
-
+        String bigText = IgniteTestUtils.randomString(new Random(), 100000000);
 
-        ConnectionManagerWrapper server1 = startManager(4000);
-        ConnectionManagerWrapper server2 = startManager(4001);
+        Map<Integer, String> map = Map.of(1, bigText, 2, bigText);
 
-        NettySender sender1 = server1.openChannelTo(server2, 
ChannelType.DEFAULT).get(3, TimeUnit.SECONDS);
-        NettySender sender2 = server1.openChannelTo(server2, 
ChannelType.TEST).get(3, TimeUnit.SECONDS);
+        try (ConnectionManagerWrapper server1 = startManager(4000);
+                ConnectionManagerWrapper server2 = startManager(4001)) {
 
+            NettySender sender1 = server1.openChannelTo(server2).get(3, 
TimeUnit.SECONDS);
+            NettySender sender2 = server1.openChannelTo(server2, 
testChannel).get(3, TimeUnit.SECONDS);
 
-        TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).build();
-        TestMessage msg = messageFactory.testMessage().msg("test").build();
+            TestMessage bigMessage = 
messageFactory.testMessage().msg(bigText).map(map).build();
 
-        CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
-        CompletableFuture<Void> send2 = sender2.send(new OutNetworkObject(msg, 
Collections.emptyList()));
+            CompletableFuture<Void> send1 = sender1.send(new 
OutNetworkObject(bigMessage, Collections.emptyList()));
+            CompletableFuture.runAsync(() -> {
+                for (int i = 0; i < 100; i++) {
+                    sender2.send(new 
OutNetworkObject(messageFactory.emptyMessage().build(), 
Collections.emptyList()));
+                }
+            });
 
-        assertThat(send2, willCompleteSuccessfully());
-        assertThat(send1.isDone(), is(false));
+            AtomicBoolean atLeastOneSmallWas = new AtomicBoolean(false);
+            server2.connectionManager.addListener(inNetworkObject -> {
+                System.out.println(inNetworkObject.message().groupType());

Review Comment:
   I think you forgot to delete this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to