timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r488673640



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -381,78 +343,59 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
                 .rangeClosed(r.portFrom(), r.portTo()).boxed()
                 .map(p -> new InetSocketAddress(r.host(), p))
             )
-            .collect(Collectors.toList());
-    }
-
-    /** */
-    private synchronized ClientChannel channel() {
-        if (closed)
-            throw new ClientException("Channel is closed");
-
-        try {
-            return channels[curChIdx].getOrCreateChannel();
-        }
-        catch (ClientConnectionException e) {
-            rollCurrentChannel();
-
-            throw e;
-        }
-    }
-
-    /** */
-    private synchronized void rollCurrentChannel() {
-        if (++curChIdx >= channels.length)
-            curChIdx = 0;
+            .collect(Collectors.toSet());
     }
 
     /**
-     * On current channel failure.
+     * Roll current default channel if specified holder equals to it.
      */
-    private synchronized void onChannelFailure(ClientChannel ch) {
-        // There is nothing wrong if curChIdx was concurrently changed, since 
channel was closed by another thread
-        // when current index was changed and no other wrong channel will be 
closed by current thread because
-        // onChannelFailure checks channel binded to the holder before closing 
it.
-        onChannelFailure(channels[curChIdx], ch);
-
-        chFailLsnrs.forEach(Runnable::run);
+    private void rollCurrentChannel(ClientChannelHolder hld) {
+        curChannelsGuard.writeLock().lock();
+        try {
+            ClientChannelHolder dfltHld = channels.get().get(curChIdx);
+            if (dfltHld == hld) {
+                int idx = curChIdx + 1;
+                if (idx >= channels.get().size())
+                    curChIdx = 0;
+                else
+                    curChIdx = idx;
+            }
+        } finally {
+            curChannelsGuard.writeLock().unlock();
+        }
     }
 
     /**
      * On channel of the specified holder failure.
      */
-    private synchronized void onChannelFailure(ClientChannelHolder hld, 
ClientChannel ch) {
-        if (ch == hld.ch && ch != null) {
+    private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+        if (hld != null && ch != null && ch == hld.ch)
             hld.closeChannel();
 
-            if (hld == channels[curChIdx])
-                rollCurrentChannel();
-        }
+        rollCurrentChannel(hld);
+
+        chFailLsnrs.forEach(Runnable::run);
     }
 
     /**
      * Asynchronously try to establish a connection to all configured servers.
      */
-    private void initAllChannelsAsync() {
-        // Skip if there is already channels reinit scheduled.
-        if (scheduledChannelsReinit.compareAndSet(false, true)) {
-            asyncRunner.submit(
-                () -> {
-                    scheduledChannelsReinit.set(false);
-
-                    for (ClientChannelHolder hld : channels) {
-                        if (scheduledChannelsReinit.get() || closed)
-                            return; // New reinit task scheduled or channel is 
closed.
-
-                        try {
-                            hld.getOrCreateChannel(true);
-                        }
-                        catch (Exception ignore) {
-                            // No-op.
-                        }
+    void initAllChannelsAsync() {

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {

Review comment:
       FIxed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {
+        if (!force && channels.get() != null)
+            return;
+
+        // Skip if there is already channels reinit scheduled.
+        // Flag is set back when a thread comes in synchronized 
initChannelHolders
+        if (scheduledChannelsReinit.compareAndSet(false, true)) {
+            initChannelHolders(force);
+
+            if (partitionAwarenessEnabled)
+                initAllChannelsAsync();
+        }
+    }
+
+    /**
+     * Apply specified {@code function} on a channel corresponding to 
specified {@code nodeId}.
+     */
+    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> 
function) {
+        ClientChannelHolder hld = null;
+        ClientChannel channel = null;
+
+        try {
+            hld = nodeChannels.get(nodeId);
+
+            channel = Optional

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to