timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r482847371



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
     /** Channels reinit was scheduled. */
     private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
 
-    /** Affinity map update is in progress. */
-    private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
     /** Channel is closed. */
     private volatile boolean closed;
 
     /** Fail (disconnect) listeners. */
-    private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+    private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
 
-    /**
-     * Constructor.
-     */
-    ReliableChannel(
-        Function<ClientChannelConfiguration, ClientChannel> chFactory,
-        ClientConfiguration clientCfg,
-        IgniteBinary binary
-    ) throws ClientException {
+    /** Fail (disconnect) listeners. */
+    private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new 
ArrayList<>();
+
+    /** Guard channels and curChIdx together. */
+    private final ReadWriteLock curChannelsGuard = new 
ReentrantReadWriteLock();
+
+    /** Constructor. */
+    ReliableChannel(ClientConfiguration clientCfg,
+                    Function<ClientChannelConfiguration, ClientChannel> 
chFactory,
+                    boolean initAllChannels) {
         if (chFactory == null)
             throw new NullPointerException("chFactory");
 
         if (clientCfg == null)
             throw new NullPointerException("clientCfg");
 
+        this.clientCfg = clientCfg;
         this.chFactory = chFactory;
+        this.initAllChannels = initAllChannels;
+    }
+
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
 
-        List<InetSocketAddress> addrs = 
parseAddresses(clientCfg.getAddresses());
+    /** Callback is invoked after new ClientChannel has created. */
+    private final BiConsumer<ClientChannelHolder, ClientChannel> 
onChannelCreate = (holder, ch) -> {
+        ch.addTopologyChangeListener(channel -> {
+            if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+                channelsInit(true);
+        });
 
-        channels = new ClientChannelHolder[addrs.size()];
+        ch.addNotificationListener(this);
 
-        for (int i = 0; i < channels.length; i++)
-            channels[i] = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+        nodeChannels.values().remove(holder);
+        nodeChannels.put(ch.serverNodeId(), holder);
+    };
 
-        curChIdx = new Random().nextInt(channels.length); // We already 
verified there is at least one address.
+    /** Callback is invoked after a ClientChannel has closed. */
+    private final Consumer<ClientChannel> onChannelClose = ch -> {
+        for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+            lsnr.accept(ch);
+    };
 
-        partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() && 
channels.length > 1;
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        List<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(
+                    new ClientChannelConfiguration(clientCfg, addr), 
chFactory, onChannelCreate, onChannelClose);
+
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
 
-        affinityCtx = new ClientCacheAffinityContext(binary);
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
 
-        ClientConnectionException lastEx = null;
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
 
-        for (int i = 0; i < channels.length; i++) {
+            curChannelsGuard.writeLock().lock();

Review comment:
       There are different goals for those sync operations:
   1. Synchronized method guarantees that only one thread is initializing 
holders. This is required to limit requests to the AddressFinder, and avoid do 
the same work for prepare addresses multiple times;
   2. AtomicReference is required as list of addresses are changed due to the 
preparation - merging new addresses, removing obsolete addresses. This 
performed in the "thin-client-channel#" thread.
   3. The same time channels and CurChIdx are used in non-synchronized methods 
in user threads. There should be a guarantee for consistency between those 
variables. So this a reason to use the guard.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to