timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r497485647



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))

Review comment:
       Fixed, add tests for that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to