alex-plekhanov commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r487017283



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -114,46 +126,32 @@
         Function<ClientChannelConfiguration, ClientChannel> chFactory,
         ClientConfiguration clientCfg,
         IgniteBinary binary
-    ) throws ClientException {
+    ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
-
-        channels = new ClientChannelHolder[addrs.size()];
-
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
-
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
-
-        affinityCtx = new ClientCacheAffinityContext(binary);
-
-        ClientConnectionException lastEx = null;
-
-        for (int i = 0; i < channels.length; i++) {
-            try {
-                channels[curChIdx].getOrCreateChannel();
-
-                if (partitionAwarenessEnabled)
-                    initAllChannelsAsync();
-
-                return;
-            } catch (ClientConnectionException e) {
-                lastEx = e;
-
-                rollCurrentChannel();
-            }
-        }
+        affCtx = new ClientCacheAffinityContext(binary);
+    }
 
-        throw lastEx;
+    /**
+     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created
+     * for every configured server. Otherwise only default channel is 
connected.
+     */
+    void initConnection() {
+        channelsInit(false);
+        if (!partitionAwarenessEnabled)
+            applyOnDefaultChannel(channel -> {
+                // do nothing, just trigger channel connection.

Review comment:
       Here and below, please use Ignite code style for comments (start with 
uppercase later, end with a point).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());

Review comment:
       Can we skip further steps if `clientCfg.getAddresses()` is not changed? 
(for example, in case of static addresses configuration you will always reinit 
holders, but it's redundant) 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -356,7 +318,7 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
     /**
      * @return host:port_range address lines parsed as {@link 
InetSocketAddress}.
      */
-    private static List<InetSocketAddress> parseAddresses(String[] addrs) 
throws ClientException {
+    private Set<InetSocketAddress> parseAddresses(String[] addrs) throws 
ClientException {

Review comment:
       Let's keep it `static`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))

Review comment:
       I'm not sure that replacing `List` to `Set` it's a good idea here 
(perhaps you should just copy list to set here locally). Using `List` you can 
solve some user problems which can't be solved with `Set`. For example, using 
`list` you can make some server nodes to be used more often than another. Also, 
by having a single duplicated address you can workaround some known issues.     

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());

Review comment:
       `new ArrayList<>()` -> `Collections.emptyList()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(
+                    new ClientChannelConfiguration(clientCfg, addr), 
chFactory, onChannelCreate, onChannelClose);
+
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
 
-        affinityCtx = new ClientCacheAffinityContext(binary);
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
 
-        ClientConnectionException lastEx = null;
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
 
-        for (int i = 0; i < channels.length; i++) {
+            curChannelsGuard.writeLock().lock();

Review comment:
       Agree with Pavel, we don't need `AtomicReference`, it's too verbose and 
we don't use any atomic features (no CAS operations, only get and set). Can be 
replaced with volatile variable (List or array).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {

Review comment:
       `force` flag is only set when partition awareness is used, but I think 
sometimes we should reinit holders even without partition awareness. For 
example, if we use Kubernetes discovery and new servers were started, the 
client will use only addresses of server nodes that were available at the time 
of client start. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()

Review comment:
       Perhaps code will be much more simple with regular loop over holders 
without streams and intermediate structures.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -114,46 +126,32 @@
         Function<ClientChannelConfiguration, ClientChannel> chFactory,
         ClientConfiguration clientCfg,
         IgniteBinary binary
-    ) throws ClientException {
+    ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
-
-        channels = new ClientChannelHolder[addrs.size()];
-
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
-
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
-
-        affinityCtx = new ClientCacheAffinityContext(binary);
-
-        ClientConnectionException lastEx = null;
-
-        for (int i = 0; i < channels.length; i++) {
-            try {
-                channels[curChIdx].getOrCreateChannel();
-
-                if (partitionAwarenessEnabled)
-                    initAllChannelsAsync();
-
-                return;
-            } catch (ClientConnectionException e) {
-                lastEx = e;
-
-                rollCurrentChannel();
-            }
-        }
+        affCtx = new ClientCacheAffinityContext(binary);
+    }
 
-        throw lastEx;
+    /**
+     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created
+     * for every configured server. Otherwise only default channel is 
connected.
+     */
+    void initConnection() {
+        channelsInit(false);
+        if (!partitionAwarenessEnabled)
+            applyOnDefaultChannel(channel -> {
+                // do nothing, just trigger channel connection.

Review comment:
       Here and below, please use Ignite code style for comments (start with 
uppercase later, end with a point).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());

Review comment:
       Can we skip further steps if `clientCfg.getAddresses()` is not changed? 
(for example, in case of static addresses configuration you will always reinit 
holders, but it's redundant) 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -356,7 +318,7 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
     /**
      * @return host:port_range address lines parsed as {@link 
InetSocketAddress}.
      */
-    private static List<InetSocketAddress> parseAddresses(String[] addrs) 
throws ClientException {
+    private Set<InetSocketAddress> parseAddresses(String[] addrs) throws 
ClientException {

Review comment:
       Let's keep it `static`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))

Review comment:
       I'm not sure that replacing `List` to `Set` it's a good idea here 
(perhaps you should just copy list to set here locally). Using `List` you can 
solve some user problems which can't be solved with `Set`. For example, using 
`list` you can make some server nodes to be used more often than another. Also, 
by having a single duplicated address you can workaround some known issues.     

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());

Review comment:
       `new ArrayList<>()` -> `Collections.emptyList()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(
+                    new ClientChannelConfiguration(clientCfg, addr), 
chFactory, onChannelCreate, onChannelClose);
+
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
 
-        affinityCtx = new ClientCacheAffinityContext(binary);
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
 
-        ClientConnectionException lastEx = null;
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
 
-        for (int i = 0; i < channels.length; i++) {
+            curChannelsGuard.writeLock().lock();

Review comment:
       Agree with Pavel, we don't need `AtomicReference`, it's too verbose and 
we don't use any atomic features (no CAS operations, only get and set). Can be 
replaced with volatile variable (List or array).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {

Review comment:
       `force` flag is only set when partition awareness is used, but I think 
sometimes we should reinit holders even without partition awareness. For 
example, if we use Kubernetes discovery and new servers were started, the 
client will use only addresses of server nodes that were available at the time 
of client start. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()

Review comment:
       Perhaps code will be much more simple with regular loop over holders 
without streams and intermediate structures.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -114,46 +126,32 @@
         Function<ClientChannelConfiguration, ClientChannel> chFactory,
         ClientConfiguration clientCfg,
         IgniteBinary binary
-    ) throws ClientException {
+    ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
-
-        channels = new ClientChannelHolder[addrs.size()];
-
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
-
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
-
-        affinityCtx = new ClientCacheAffinityContext(binary);
-
-        ClientConnectionException lastEx = null;
-
-        for (int i = 0; i < channels.length; i++) {
-            try {
-                channels[curChIdx].getOrCreateChannel();
-
-                if (partitionAwarenessEnabled)
-                    initAllChannelsAsync();
-
-                return;
-            } catch (ClientConnectionException e) {
-                lastEx = e;
-
-                rollCurrentChannel();
-            }
-        }
+        affCtx = new ClientCacheAffinityContext(binary);
+    }
 
-        throw lastEx;
+    /**
+     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created
+     * for every configured server. Otherwise only default channel is 
connected.
+     */
+    void initConnection() {
+        channelsInit(false);
+        if (!partitionAwarenessEnabled)
+            applyOnDefaultChannel(channel -> {
+                // do nothing, just trigger channel connection.

Review comment:
       Here and below, please use Ignite code style for comments (start with 
uppercase later, end with a point).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());

Review comment:
       Can we skip further steps if `clientCfg.getAddresses()` is not changed? 
(for example, in case of static addresses configuration you will always reinit 
holders, but it's redundant) 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -356,7 +318,7 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
     /**
      * @return host:port_range address lines parsed as {@link 
InetSocketAddress}.
      */
-    private static List<InetSocketAddress> parseAddresses(String[] addrs) 
throws ClientException {
+    private Set<InetSocketAddress> parseAddresses(String[] addrs) throws 
ClientException {

Review comment:
       Let's keep it `static`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))

Review comment:
       I'm not sure that replacing `List` to `Set` it's a good idea here 
(perhaps you should just copy list to set here locally). Using `List` you can 
solve some user problems which can't be solved with `Set`. For example, using 
`list` you can make some server nodes to be used more often than another. Also, 
by having a single duplicated address you can workaround some known issues.     

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());

Review comment:
       `new ArrayList<>()` -> `Collections.emptyList()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(
+                    new ClientChannelConfiguration(clientCfg, addr), 
chFactory, onChannelCreate, onChannelClose);
+
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
 
-        affinityCtx = new ClientCacheAffinityContext(binary);
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
 
-        ClientConnectionException lastEx = null;
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
 
-        for (int i = 0; i < channels.length; i++) {
+            curChannelsGuard.writeLock().lock();

Review comment:
       Agree with Pavel, we don't need `AtomicReference`, it's too verbose and 
we don't use any atomic features (no CAS operations, only get and set). Can be 
replaced with volatile variable (List or array).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {

Review comment:
       `force` flag is only set when partition awareness is used, but I think 
sometimes we should reinit holders even without partition awareness. For 
example, if we use Kubernetes discovery and new servers were started, the 
client will use only addresses of server nodes that were available at the time 
of client start. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()

Review comment:
       Perhaps code will be much more simple with regular loop over holders 
without streams and intermediate structures.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -114,46 +126,32 @@
         Function<ClientChannelConfiguration, ClientChannel> chFactory,
         ClientConfiguration clientCfg,
         IgniteBinary binary
-    ) throws ClientException {
+    ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
-
-        channels = new ClientChannelHolder[addrs.size()];
-
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
-
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
-
-        affinityCtx = new ClientCacheAffinityContext(binary);
-
-        ClientConnectionException lastEx = null;
-
-        for (int i = 0; i < channels.length; i++) {
-            try {
-                channels[curChIdx].getOrCreateChannel();
-
-                if (partitionAwarenessEnabled)
-                    initAllChannelsAsync();
-
-                return;
-            } catch (ClientConnectionException e) {
-                lastEx = e;
-
-                rollCurrentChannel();
-            }
-        }
+        affCtx = new ClientCacheAffinityContext(binary);
+    }
 
-        throw lastEx;
+    /**
+     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created
+     * for every configured server. Otherwise only default channel is 
connected.
+     */
+    void initConnection() {
+        channelsInit(false);
+        if (!partitionAwarenessEnabled)
+            applyOnDefaultChannel(channel -> {
+                // do nothing, just trigger channel connection.

Review comment:
       Here and below, please use Ignite code style for comments (start with 
uppercase later, end with a point).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());

Review comment:
       Can we skip further steps if `clientCfg.getAddresses()` is not changed? 
(for example, in case of static addresses configuration you will always reinit 
holders, but it's redundant) 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -356,7 +318,7 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
     /**
      * @return host:port_range address lines parsed as {@link 
InetSocketAddress}.
      */
-    private static List<InetSocketAddress> parseAddresses(String[] addrs) 
throws ClientException {
+    private Set<InetSocketAddress> parseAddresses(String[] addrs) throws 
ClientException {

Review comment:
       Let's keep it `static`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))

Review comment:
       I'm not sure that replacing `List` to `Set` it's a good idea here 
(perhaps you should just copy list to set here locally). Using `List` you can 
solve some user problems which can't be solved with `Set`. For example, using 
`list` you can make some server nodes to be used more often than another. Also, 
by having a single duplicated address you can workaround some known issues.     

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());

Review comment:
       `new ArrayList<>()` -> `Collections.emptyList()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(
+                    new ClientChannelConfiguration(clientCfg, addr), 
chFactory, onChannelCreate, onChannelClose);
+
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
 
-        affinityCtx = new ClientCacheAffinityContext(binary);
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
 
-        ClientConnectionException lastEx = null;
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
 
-        for (int i = 0; i < channels.length; i++) {
+            curChannelsGuard.writeLock().lock();

Review comment:
       Agree with Pavel, we don't need `AtomicReference`, it's too verbose and 
we don't use any atomic features (no CAS operations, only get and set). Can be 
replaced with volatile variable (List or array).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {

Review comment:
       `force` flag is only set when partition awareness is used, but I think 
sometimes we should reinit holders even without partition awareness. For 
example, if we use Kubernetes discovery and new servers were started, the 
client will use only addresses of server nodes that were available at the time 
of client start. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()

Review comment:
       Perhaps code will be much more simple with regular loop over holders 
without streams and intermediate structures.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -114,46 +126,32 @@
         Function<ClientChannelConfiguration, ClientChannel> chFactory,
         ClientConfiguration clientCfg,
         IgniteBinary binary
-    ) throws ClientException {
+    ) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
-
-        channels = new ClientChannelHolder[addrs.size()];
-
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
-
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
-
-        affinityCtx = new ClientCacheAffinityContext(binary);
-
-        ClientConnectionException lastEx = null;
-
-        for (int i = 0; i < channels.length; i++) {
-            try {
-                channels[curChIdx].getOrCreateChannel();
-
-                if (partitionAwarenessEnabled)
-                    initAllChannelsAsync();
-
-                return;
-            } catch (ClientConnectionException e) {
-                lastEx = e;
-
-                rollCurrentChannel();
-            }
-        }
+        affCtx = new ClientCacheAffinityContext(binary);
+    }
 
-        throw lastEx;
+    /**
+     * Establishing connections to servers. If partition awareness feature is 
enabled connections are created
+     * for every configured server. Otherwise only default channel is 
connected.
+     */
+    void initConnection() {
+        channelsInit(false);
+        if (!partitionAwarenessEnabled)
+            applyOnDefaultChannel(channel -> {
+                // do nothing, just trigger channel connection.

Review comment:
       Here and below, please use Ignite code style for comments (start with 
uppercase later, end with a point).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());

Review comment:
       Can we skip further steps if `clientCfg.getAddresses()` is not changed? 
(for example, in case of static addresses configuration you will always reinit 
holders, but it's redundant) 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -356,7 +318,7 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
     /**
      * @return host:port_range address lines parsed as {@link 
InetSocketAddress}.
      */
-    private static List<InetSocketAddress> parseAddresses(String[] addrs) 
throws ClientException {
+    private Set<InetSocketAddress> parseAddresses(String[] addrs) throws 
ClientException {

Review comment:
       Let's keep it `static`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))

Review comment:
       I'm not sure that replacing `List` to `Set` it's a good idea here 
(perhaps you should just copy list to set here locally). Using `List` you can 
solve some user problems which can't be solved with `Set`. For example, using 
`list` you can make some server nodes to be used more often than another. Also, 
by having a single duplicated address you can workaround some known issues.     

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());

Review comment:
       `new ArrayList<>()` -> `Collections.emptyList()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(
+                    new ClientChannelConfiguration(clientCfg, addr), 
chFactory, onChannelCreate, onChannelClose);
+
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
 
-        affinityCtx = new ClientCacheAffinityContext(binary);
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
 
-        ClientConnectionException lastEx = null;
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
 
-        for (int i = 0; i < channels.length; i++) {
+            curChannelsGuard.writeLock().lock();

Review comment:
       Agree with Pavel, we don't need `AtomicReference`, it's too verbose and 
we don't use any atomic features (no CAS operations, only get and set). Can be 
replaced with volatile variable (List or array).

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {

Review comment:
       `force` flag is only set when partition awareness is used, but I think 
sometimes we should reinit holders even without partition awareness. For 
example, if we use Kubernetes discovery and new servers were started, the 
client will use only addresses of server nodes that were available at the time 
of client start. 

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()

Review comment:
       Perhaps code will be much more simple with regular loop over holders 
without streams and intermediate structures.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to