timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r488674967



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {

Review comment:
       FIxed. Skip check flag `partitionAwareness` and just checks topology 
version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to