alex-plekhanov commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r489364226



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -114,46 +135,32 @@
         Function<ClientChannelConfiguration, ClientChannel> chFactory,
         ClientConfiguration clientCfg,
         IgniteBinary binary
-    ) throws ClientException {
+    ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
-
-        channels = new ClientChannelHolder[addrs.size()];
-
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
-
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
-
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
         affinityCtx = new ClientCacheAffinityContext(binary);
+    }
 
-        ClientConnectionException lastEx = null;
-
-        for (int i = 0; i < channels.length; i++) {
-            try {
-                channels[curChIdx].getOrCreateChannel();
-
-                if (partitionAwarenessEnabled)
-                    initAllChannelsAsync();
-
-                return;
-            } catch (ClientConnectionException e) {
-                lastEx = e;
-
-                rollCurrentChannel();
-            }
-        }
-
-        throw lastEx;
+    /**
+     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created
+     * for every configured server. Otherwise only default channel is 
connected.
+     */
+    void initConnection() {
+        channelsInit(false);
+        if (!partitionAwarenessEnabled)

Review comment:
       If partition awareness is enabled we still need to init default channel 
on startup

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -461,9 +418,8 @@ private void initAllChannelsAsync() {
      * @param ch Channel.
      */
     private void onTopologyChanged(ClientChannel ch) {
-        if (partitionAwarenessEnabled && 
affinityCtx.updateLastTopologyVersion(ch.serverTopologyVersion(),
-            ch.serverNodeId()))
-            initAllChannelsAsync();
+        if (affinityCtx.updateLastTopologyVersion(ch.serverTopologyVersion(), 
ch.serverNodeId()))
+            channelsInit(true);

Review comment:
       This method is invoked inside request processing thread, I don't like 
the idea of sync channel holders reinit (which can do additional sync http 
requests) on each topology change in this thread.
   I think we should do it async if partition awareness is enabled. 
Additionally, we can do it sync in sending thread on channel failure and if we 
detect topology change before.   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,16 +429,230 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    synchronized void initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Set<InetSocketAddress> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = 
clientCfg.getAddressesFinder().getServerAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = 
Collections.emptyMap();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs);
+
+        if (holders != null) {
+            curAddrs = holders.stream()
+                .collect(Collectors.toMap(h -> h.chCfg.getAddress(), h -> h));
+
+            allAddrs.addAll(curAddrs.keySet());
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+        // The variable holds a new index of default channel after topology 
change.
+        // Suppose that reuse of the channel is better than open new 
connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+        int idx = curChIdx;
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.contains(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+            reinitHolders.add(hld);
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();
+        try {
+            channels = reinitHolders;
+            curChIdx = dfltChannelIdx;
+        }
+        finally {
+            curChannelsGuard.writeLock().unlock();
+        }
+
+        finishChannelsReInit = System.currentTimeMillis();
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {
+        if (!force && channels != null)
+            return;
+
+        // Skip if there is already channels reinit scheduled.
+        // Flag is set back when a thread comes in synchronized 
initChannelHolders.
+        if (scheduledChannelsReinit.compareAndSet(false, true)) {
+            initChannelHolders(force);
+
+            if (partitionAwarenessEnabled)
+                initAllChannelsAsync();
+        }
+    }
+
+    /**
+     * Apply specified {@code function} on a channel corresponding to 
specified {@code nodeId}.
+     */
+    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> 
function) {
+        ClientChannelHolder hld = null;
+        ClientChannel channel = null;
+
+        try {
+            hld = nodeChannels.get(nodeId);
+
+            channel = hld != null ? hld.getOrCreateChannel() : null;
+
+            if (channel != null)
+                return function.apply(channel);
+
+        } catch (ClientConnectionException e) {
+            onChannelFailure(hld, channel);
+        }
+
+        return null;
+    }
+
+    /**
+     * Apply specified {@code function} on any of available channel.
+     */
+    private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (holders == null)
+            throw new ClientException("Connections to nodes aren't 
initialized.");
+
+        int size = holders.size();
+
+        int attemptsLimit = clientCfg.getRetryLimit() > 0 ?
+            Math.min(clientCfg.getRetryLimit(), size) : size;
+
+        ClientConnectionException failure = null;
+
+        for (int attempt = 0; attempt < attemptsLimit; attempt++) {
+            ClientChannelHolder hld = null;
+            ClientChannel c = null;
+            try {
+                if (closed)
+                    throw new ClientException("Channel is closed");
+
+                curChannelsGuard.readLock().lock();
+                try {
+                    hld = channels.get(curChIdx);
+                } finally {
+                    curChannelsGuard.readLock().unlock();
+                }
+
+                c = hld.getOrCreateChannel();
+                if (c != null)
+                    return function.apply(c);
+            }
+            catch (ClientConnectionException e) {
+                if (failure == null)
+                    failure = e;
+                else
+                    failure.addSuppressed(e);
+
+                onChannelFailure(hld, c);
+            }
+        }
+
+        throw failure;
+    }
+
+    /**
+     * Try apply specified {@code function} on a channel corresponding to 
{@code tryNodeId}.
+     * If failed then apply the function on any available channel.
+     */
+    private <T> T apply(UUID tryNodeId, Function<ClientChannel, T> function) {

Review comment:
       Let's rename it to something more meaningful (like another two 
`apply...` methods). For, example, `applyOnNodeChannelWithRetry` or something 
like that.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class test ReliableChannel channels re-initialization.
+ */
+public class ReliableChannelTest {
+    /** Mock factory for creating new channels. */
+    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory = cfg -> new TestClientChannel();
+
+    /** Checks that channel holders are not reinited for static address 
configuration. */
+    @Test
+    public void testChannelsNotReinitForStaticAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");

Review comment:
       Let's use standard ports for thin client (10800). Yes, I know it's just 
a dummy address, but someone can understand it wrong during a brief test lookup.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
+
+/**
+ * Test partition awareness of thin client on changed topology.
+ */
+public class ThinClientPartitionAwarenessDiscoveryTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that client use channels to all running nodes while new nodes start
+     */
+    @Test
+    public void testClientDiscoveryNodesJoin() throws Exception {
+        for (int i = 0; i < MAX_CLUSTER_SIZE; ++i) {
+            startGrid(i);
+            awaitPartitionMapExchange();
+
+            int[] workChannels = IntStream.rangeClosed(0, i).toArray();
+
+            if (i == 0)
+                initClient(getClientConfigurationWithDiscovery(), 
workChannels);
+            else {
+                detectTopologyChange();
+                awaitChannelsInit(workChannels);
+            }
+
+            testPartitionAwareness(workChannels);
+        }
+    }
+
+    /**
+     * Test that client use channels to all running nodes while nodes stop

Review comment:
       point at the end

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
+
+/**
+ * Test partition awareness of thin client on changed topology.
+ */
+public class ThinClientPartitionAwarenessDiscoveryTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that client use channels to all running nodes while new nodes start
+     */
+    @Test
+    public void testClientDiscoveryNodesJoin() throws Exception {
+        for (int i = 0; i < MAX_CLUSTER_SIZE; ++i) {
+            startGrid(i);
+            awaitPartitionMapExchange();
+
+            int[] workChannels = IntStream.rangeClosed(0, i).toArray();
+
+            if (i == 0)
+                initClient(getClientConfigurationWithDiscovery(), 
workChannels);
+            else {
+                detectTopologyChange();
+                awaitChannelsInit(workChannels);
+            }
+
+            testPartitionAwareness(workChannels);
+        }
+    }
+
+    /**
+     * Test that client use channels to all running nodes while nodes stop
+     */
+    @Test
+    public void testClientDiscoveryNodesLeave() throws Exception {

Review comment:
       We don't check discovery by this test at all. If we replace 
`getClientConfigurationWithDiscovery()` with `getClientConfiguration(0, 1, 2, 
3)` result will be the same, test will pass. To check discovery I think we 
should exclude additional nodes by discovery (but not stop it) on topology 
change and check that this node is excluded from requests too.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -379,80 +351,65 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
         return ranges.stream()
             .flatMap(r -> IntStream
                 .rangeClosed(r.portFrom(), r.portTo()).boxed()
-                .map(p -> new InetSocketAddress(r.host(), p))
+                .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
             )
-            .collect(Collectors.toList());
-    }
-
-    /** */
-    private synchronized ClientChannel channel() {
-        if (closed)
-            throw new ClientException("Channel is closed");
-
-        try {
-            return channels[curChIdx].getOrCreateChannel();
-        }
-        catch (ClientConnectionException e) {
-            rollCurrentChannel();
-
-            throw e;
-        }
-    }
-
-    /** */
-    private synchronized void rollCurrentChannel() {
-        if (++curChIdx >= channels.length)
-            curChIdx = 0;
+            .collect(Collectors.toSet());
     }
 
     /**
-     * On current channel failure.
+     * Roll current default channel if specified holder equals to it.
      */
-    private synchronized void onChannelFailure(ClientChannel ch) {
-        // There is nothing wrong if curChIdx was concurrently changed, since 
channel was closed by another thread
-        // when current index was changed and no other wrong channel will be 
closed by current thread because
-        // onChannelFailure checks channel binded to the holder before closing 
it.
-        onChannelFailure(channels[curChIdx], ch);
-
-        chFailLsnrs.forEach(Runnable::run);
+    private void rollCurrentChannel(ClientChannelHolder hld) {
+        curChannelsGuard.writeLock().lock();
+        try {
+            int idx = curChIdx;
+            List<ClientChannelHolder> holders = channels;
+
+            ClientChannelHolder dfltHld = holders.get(idx);
+            if (dfltHld == hld) {
+                idx += 1;
+                if (idx >= holders.size())
+                    curChIdx = 0;
+                else
+                    curChIdx = idx;
+            }
+        } finally {
+            curChannelsGuard.writeLock().unlock();
+        }
     }
 
     /**
      * On channel of the specified holder failure.
      */
-    private synchronized void onChannelFailure(ClientChannelHolder hld, 
ClientChannel ch) {
-        if (ch == hld.ch && ch != null) {
+    private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+        if (hld != null && ch != null && ch == hld.ch)

Review comment:
       `hld` can't be null here

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -114,46 +135,32 @@
         Function<ClientChannelConfiguration, ClientChannel> chFactory,
         ClientConfiguration clientCfg,
         IgniteBinary binary
-    ) throws ClientException {
+    ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
-
-        channels = new ClientChannelHolder[addrs.size()];
-
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
-
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
-
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
         affinityCtx = new ClientCacheAffinityContext(binary);
+    }
 
-        ClientConnectionException lastEx = null;
-
-        for (int i = 0; i < channels.length; i++) {
-            try {
-                channels[curChIdx].getOrCreateChannel();
-
-                if (partitionAwarenessEnabled)
-                    initAllChannelsAsync();
-
-                return;
-            } catch (ClientConnectionException e) {
-                lastEx = e;
-
-                rollCurrentChannel();
-            }
-        }
-
-        throw lastEx;
+    /**
+     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created
+     * for every configured server. Otherwise only default channel is 
connected.
+     */
+    void initConnection() {
+        channelsInit(false);

Review comment:
       Here and bellow, please use Ignite codestyle (see 
https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines `Semantic 
Units` section)

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -519,29 +689,44 @@ private boolean applyReconnectionThrottling() {
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel()
+        private ClientChannel getOrCreateChannel()
             throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
             return getOrCreateChannel(false);
         }
 
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel(boolean 
ignoreThrottling)
+        private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
             throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
-            if (ch == null) {
-                if (!ignoreThrottling && applyReconnectionThrottling())
-                    throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
+            if (ch == null && !close) {
+                synchronized (this) {
+                    if (close)
+                        return null;
+
+                    if (ch != null)
+                        return ch;
 
-                ch = chFactory.apply(chCfg);
+                    if (!ignoreThrottling && applyReconnectionThrottling())
+                        throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
 
-                if (ch.serverNodeId() != null) {
-                    
ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
-                    ch.addNotificationListener(ReliableChannel.this);
+                    ClientChannel channel = chFactory.apply(chCfg);
 
-                    nodeChannels.values().remove(this);
+                    if (channel.serverNodeId() != null) {
+                        
channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+                        channel.addNotificationListener(ReliableChannel.this);
 
-                    nodeChannels.putIfAbsent(ch.serverNodeId(), this);
+                        if (serverNodeId == null)
+                            serverNodeId = channel.serverNodeId();
+
+                        if (serverNodeId != null && serverNodeId != 
channel.serverNodeId())
+                            nodeChannels.remove(serverNodeId);

Review comment:
       `nodeChannels.remove(serverNodeId, this)`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -561,5 +746,24 @@ private synchronized void closeChannel() {
                 ch = null;
             }
         }
+
+        /** Close holder. */
+        void close() {
+            close = true;
+            if (serverNodeId != null)
+                nodeChannels.remove(serverNodeId);
+
+            closeChannel();
+        }
+
+        /** Wheteher the holder is closed. For test purposes. */
+        boolean isClosed() {
+            return close;
+        }
+    }
+
+    /** Get holders reference. For test purposes. */
+    List<ClientChannelHolder> getChannelHolders() {

Review comment:
       Let's make `channel` package private and use it in tests directly (or 
make `nodeChannels` private and create the same getter for it)

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -519,29 +689,44 @@ private boolean applyReconnectionThrottling() {
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel()
+        private ClientChannel getOrCreateChannel()
             throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
             return getOrCreateChannel(false);
         }
 
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel(boolean 
ignoreThrottling)
+        private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
             throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
-            if (ch == null) {
-                if (!ignoreThrottling && applyReconnectionThrottling())
-                    throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
+            if (ch == null && !close) {
+                synchronized (this) {
+                    if (close)
+                        return null;
+
+                    if (ch != null)
+                        return ch;
 
-                ch = chFactory.apply(chCfg);
+                    if (!ignoreThrottling && applyReconnectionThrottling())
+                        throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
 
-                if (ch.serverNodeId() != null) {
-                    
ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
-                    ch.addNotificationListener(ReliableChannel.this);
+                    ClientChannel channel = chFactory.apply(chCfg);
 
-                    nodeChannels.values().remove(this);
+                    if (channel.serverNodeId() != null) {
+                        
channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+                        channel.addNotificationListener(ReliableChannel.this);
 
-                    nodeChannels.putIfAbsent(ch.serverNodeId(), this);
+                        if (serverNodeId == null)

Review comment:
       This `if` is redundant, since you set serverNodeId later and the next 
`if` condition can't be true with this condition at the same time. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -561,5 +746,24 @@ private synchronized void closeChannel() {
                 ch = null;
             }
         }
+
+        /** Close holder. */
+        void close() {
+            close = true;
+            if (serverNodeId != null)
+                nodeChannels.remove(serverNodeId);

Review comment:
       `nodeChannels.remove(serverNodeId, this)`

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class test ReliableChannel channels re-initialization.
+ */
+public class ReliableChannelTest {
+    /** Mock factory for creating new channels. */
+    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory = cfg -> new TestClientChannel();
+
+    /** Checks that channel holders are not reinited for static address 
configuration. */
+    @Test
+    public void testChannelsNotReinitForStaticAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** Checks that channel holders are not reinited if address finder return 
the same list of addresses. */
+    @Test
+    public void testChannelsNotReinitForStableDynamicAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddressesFinder(new TestAddressFinder("127.0.0.1:8000", 
"127.0.0.1:8001", "127.0.0.1:8002"));
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** */
+    private void checkDoesNotReinit(ClientConfiguration ccfg) {
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+        List<ReliableChannel.ClientChannelHolder> originalChannels = 
rc.getChannelHolders();
+
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+        List<ReliableChannel.ClientChannelHolder> newChannels = 
rc.getChannelHolders();
+
+        assertSame(originalChannels, newChannels);
+        IntStream.range(0, 3).forEach(i -> {
+            assertSame(originalChannels.get(i), newChannels.get(i));

Review comment:
       Didn't get this check. We already checked that references are the same. 
Isn't it?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
+
+/**
+ * Test partition awareness of thin client on changed topology.
+ */
+public class ThinClientPartitionAwarenessDiscoveryTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that client use channels to all running nodes while new nodes start
+     */
+    @Test
+    public void testClientDiscoveryNodesJoin() throws Exception {
+        for (int i = 0; i < MAX_CLUSTER_SIZE; ++i) {
+            startGrid(i);
+            awaitPartitionMapExchange();
+
+            int[] workChannels = IntStream.rangeClosed(0, i).toArray();
+
+            if (i == 0)
+                initClient(getClientConfigurationWithDiscovery(), 
workChannels);
+            else {
+                detectTopologyChange();
+                awaitChannelsInit(workChannels);
+            }
+
+            testPartitionAwareness(workChannels);
+        }
+    }
+
+    /**
+     * Test that client use channels to all running nodes while nodes stop
+     */
+    @Test
+    public void testClientDiscoveryNodesLeave() throws Exception {
+        startGrids(MAX_CLUSTER_SIZE);
+        awaitPartitionMapExchange();
+
+        initClient(getClientConfigurationWithDiscovery(), 0, 1, 2, 3);
+        detectTopologyChange();
+
+        for (int i = MAX_CLUSTER_SIZE - 1; i != 0; i--) {
+            int[] workChannels = IntStream.range(0, i).toArray();
+
+            channels[i] = null;
+            stopGrid(i);
+            awaitPartitionMapExchange();
+            detectTopologyChange();
+
+            awaitChannelsInit(workChannels);

Review comment:
       All channels already inited after initClient, what else do we wait here?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
+
+/**
+ * Test partition awareness of thin client on changed topology.
+ */
+public class ThinClientPartitionAwarenessDiscoveryTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that client use channels to all running nodes while new nodes start
+     */
+    @Test
+    public void testClientDiscoveryNodesJoin() throws Exception {
+        for (int i = 0; i < MAX_CLUSTER_SIZE; ++i) {
+            startGrid(i);
+            awaitPartitionMapExchange();
+
+            int[] workChannels = IntStream.rangeClosed(0, i).toArray();
+
+            if (i == 0)
+                initClient(getClientConfigurationWithDiscovery(), 
workChannels);
+            else {
+                detectTopologyChange();
+                awaitChannelsInit(workChannels);
+            }
+
+            testPartitionAwareness(workChannels);
+        }
+    }
+
+    /**
+     * Test that client use channels to all running nodes while nodes stop
+     */
+    @Test
+    public void testClientDiscoveryNodesLeave() throws Exception {
+        startGrids(MAX_CLUSTER_SIZE);
+        awaitPartitionMapExchange();
+
+        initClient(getClientConfigurationWithDiscovery(), 0, 1, 2, 3);
+        detectTopologyChange();
+
+        for (int i = MAX_CLUSTER_SIZE - 1; i != 0; i--) {
+            int[] workChannels = IntStream.range(0, i).toArray();
+
+            channels[i] = null;
+            stopGrid(i);
+            awaitPartitionMapExchange();
+            detectTopologyChange();
+
+            awaitChannelsInit(workChannels);
+            testPartitionAwareness(workChannels);
+        }
+    }
+
+    /**
+     * Checks that each request goes to right node.
+     */
+    private void testPartitionAwareness(int... chIdxs) {
+        ClientCache<Object, Object> clientCache = 
client.cache(PART_CACHE_NAME);
+        IgniteInternalCache<Object, Object> igniteCache = 
grid(0).context().cache().cache(PART_CACHE_NAME);
+
+        Map<TestTcpClientChannel, Boolean> channelHits = 
Arrays.stream(chIdxs).boxed()
+            .collect(Collectors.toMap(idx -> channels[idx], idx -> false));
+
+        for (int i = 0; i < KEY_CNT; i++) {
+            TestTcpClientChannel opCh = affinityChannel(i, igniteCache);
+
+            clientCache.put(i, i);
+
+            if (i == 0)
+                assertOpOnChannel(dfltCh, ClientOperation.CACHE_PARTITIONS);
+
+            assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
+            assertTrue(channelHits.containsKey(opCh));
+
+            channelHits.compute(opCh, (c, old) -> true);
+        }
+
+        assertFalse(channelHits.containsValue(false));
+    }
+
+    /**
+     * Provide ClientConfiguration with addrResolver that find all alive nodes
+     */
+    private ClientConfiguration getClientConfigurationWithDiscovery() {
+        ClientAddressFinder addrFinder = () ->
+            IgnitionEx.allGrids().stream().map(node -> {
+                int port = (Integer) 
node.cluster().localNode().attributes().get(CLIENT_LISTENER_PORT);
+                return "127.0.0.1:" + port;
+            }).toArray(String[]::new);
+
+        return new ClientConfiguration()
+            .setAddressesFinder(addrFinder)
+            .setPartitionAwarenessEnabled(true);
+    }
+
+    /**
+     * Trigger client to detect topology change
+     */
+    private void detectTopologyChange() {
+        // Send non-affinity request to detect topology change.
+        initDefaultChannel();
+        client.getOrCreateCache(PART_CACHE_NAME);

Review comment:
       Why do we need a second request? Topology change already should be 
detected after `initDefaultChannel` call

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class test ReliableChannel channels re-initialization.
+ */
+public class ReliableChannelTest {
+    /** Mock factory for creating new channels. */
+    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory = cfg -> new TestClientChannel();
+
+    /** Checks that channel holders are not reinited for static address 
configuration. */
+    @Test
+    public void testChannelsNotReinitForStaticAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** Checks that channel holders are not reinited if address finder return 
the same list of addresses. */
+    @Test
+    public void testChannelsNotReinitForStableDynamicAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddressesFinder(new TestAddressFinder("127.0.0.1:8000", 
"127.0.0.1:8001", "127.0.0.1:8002"));
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** */
+    private void checkDoesNotReinit(ClientConfiguration ccfg) {
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+        List<ReliableChannel.ClientChannelHolder> originalChannels = 
rc.getChannelHolders();
+
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+        List<ReliableChannel.ClientChannelHolder> newChannels = 
rc.getChannelHolders();
+
+        assertSame(originalChannels, newChannels);
+        IntStream.range(0, 3).forEach(i -> {
+            assertSame(originalChannels.get(i), newChannels.get(i));
+            assertFalse(originalChannels.get(i).isClosed());
+        });
+        assertEquals(3, newChannels.size());
+    }
+
+    /** Checks that node channels are persisted if channels are reinit with 
static address configuration. */
+    @Test
+    public void testNodeChannelsAreNotCleaned() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");
+
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+        // Trigger TestClientChannel creation.
+        rc.service(null, null, null);
+
+        assertEquals(1, rc.nodeChannels.size());
+
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+
+        assertEquals(1, rc.nodeChannels.size());
+    }
+
+    /** Checks that channels are changed (add new, remove old) and close 
channels if reinitialization performed. */
+    @Test
+    public void testDynamicAddressReinitializedCorrectly() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddressesFinder(new TestAddressFinder("127.0.0.1:8000", 
"127.0.0.1:8003"));
+
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+
+        List<ReliableChannel.ClientChannelHolder> originChannels = 
Collections.unmodifiableList(rc.getChannelHolders());
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+
+        List<ReliableChannel.ClientChannelHolder> closedChannels = 
originChannels.stream()
+            .filter(r -> r.isClosed())
+            .collect(Collectors.toList());
+
+        assertEquals(2, closedChannels.size());
+
+        List<ReliableChannel.ClientChannelHolder> reuseChannel = 
originChannels.stream()
+            .filter(c -> !c.isClosed())
+            .collect(Collectors.toList());
+
+        assertEquals(1, reuseChannel.size());
+
+        List<ReliableChannel.ClientChannelHolder> newChannels = 
rc.getChannelHolders();
+        assertEquals(2, newChannels.size());
+        assertTrue(newChannels.get(0) == reuseChannel.get(0) || 
newChannels.get(1) == reuseChannel.get(0));

Review comment:
       newChannels.contains?

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
+
+/**
+ * Test partition awareness of thin client on changed topology.
+ */
+public class ThinClientPartitionAwarenessDiscoveryTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that client use channels to all running nodes while new nodes start

Review comment:
       point at the end

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class test ReliableChannel channels re-initialization.
+ */
+public class ReliableChannelTest {
+    /** Mock factory for creating new channels. */
+    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory = cfg -> new TestClientChannel();
+
+    /** Checks that channel holders are not reinited for static address 
configuration. */
+    @Test
+    public void testChannelsNotReinitForStaticAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** Checks that channel holders are not reinited if address finder return 
the same list of addresses. */
+    @Test
+    public void testChannelsNotReinitForStableDynamicAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddressesFinder(new TestAddressFinder("127.0.0.1:8000", 
"127.0.0.1:8001", "127.0.0.1:8002"));
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** */
+    private void checkDoesNotReinit(ClientConfiguration ccfg) {
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+        List<ReliableChannel.ClientChannelHolder> originalChannels = 
rc.getChannelHolders();
+
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+        List<ReliableChannel.ClientChannelHolder> newChannels = 
rc.getChannelHolders();
+
+        assertSame(originalChannels, newChannels);
+        IntStream.range(0, 3).forEach(i -> {

Review comment:
       IMO it's more readable to use `for loop` here

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class test ReliableChannel channels re-initialization.
+ */
+public class ReliableChannelTest {
+    /** Mock factory for creating new channels. */
+    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory = cfg -> new TestClientChannel();
+
+    /** Checks that channel holders are not reinited for static address 
configuration. */
+    @Test
+    public void testChannelsNotReinitForStaticAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** Checks that channel holders are not reinited if address finder return 
the same list of addresses. */
+    @Test
+    public void testChannelsNotReinitForStableDynamicAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddressesFinder(new TestAddressFinder("127.0.0.1:8000", 
"127.0.0.1:8001", "127.0.0.1:8002"));
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** */
+    private void checkDoesNotReinit(ClientConfiguration ccfg) {
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+        List<ReliableChannel.ClientChannelHolder> originalChannels = 
rc.getChannelHolders();
+
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+        List<ReliableChannel.ClientChannelHolder> newChannels = 
rc.getChannelHolders();
+
+        assertSame(originalChannels, newChannels);
+        IntStream.range(0, 3).forEach(i -> {
+            assertSame(originalChannels.get(i), newChannels.get(i));
+            assertFalse(originalChannels.get(i).isClosed());
+        });
+        assertEquals(3, newChannels.size());
+    }
+
+    /** Checks that node channels are persisted if channels are reinit with 
static address configuration. */
+    @Test
+    public void testNodeChannelsAreNotCleaned() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");
+
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+        // Trigger TestClientChannel creation.
+        rc.service(null, null, null);
+
+        assertEquals(1, rc.nodeChannels.size());
+
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+
+        assertEquals(1, rc.nodeChannels.size());
+    }
+
+    /** Checks that channels are changed (add new, remove old) and close 
channels if reinitialization performed. */
+    @Test
+    public void testDynamicAddressReinitializedCorrectly() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddressesFinder(new TestAddressFinder("127.0.0.1:8000", 
"127.0.0.1:8003"));
+
+        ReliableChannel rc = new ReliableChannel(chFactory, ccfg, null);
+        rc.initConnection();
+
+        List<ReliableChannel.ClientChannelHolder> originChannels = 
Collections.unmodifiableList(rc.getChannelHolders());
+        // Imitate topology change.
+        rc.initChannelHolders(true);
+
+        List<ReliableChannel.ClientChannelHolder> closedChannels = 
originChannels.stream()
+            .filter(r -> r.isClosed())
+            .collect(Collectors.toList());
+
+        assertEquals(2, closedChannels.size());

Review comment:
       `assertEquals(2, F.size(originChannels, r -> r.isClosed()));`

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Class test ReliableChannel channels re-initialization.
+ */
+public class ReliableChannelTest {
+    /** Mock factory for creating new channels. */
+    private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory = cfg -> new TestClientChannel();
+
+    /** Checks that channel holders are not reinited for static address 
configuration. */
+    @Test
+    public void testChannelsNotReinitForStaticAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddresses("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002");
+
+        checkDoesNotReinit(ccfg);
+    }
+
+    /** Checks that channel holders are not reinited if address finder return 
the same list of addresses. */
+    @Test
+    public void testChannelsNotReinitForStableDynamicAddressConfiguration() {
+        ClientConfiguration ccfg = new ClientConfiguration()
+            .setAddressesFinder(new TestAddressFinder("127.0.0.1:8000", 
"127.0.0.1:8001", "127.0.0.1:8002"));

Review comment:
       It's counterintuitive without looking at `TestAddressFinder` 
implementation that we use the same set of addresses. Let's make 
TestAddressFinder more generic, use some Queue inside, and use it something 
like this: 
   `new TestAddressFinder().add("127.0.0.1:8000", "127.0.0.1:8001", 
"127.0.0.1:8002").add("127.0.0.1:8000", "127.0.0.1:8001", "127.0.0.1:8002")`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to