ptupitsyn commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r486971241



##########
File path: 
modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
##########
@@ -115,18 +125,35 @@
     /** Reconnect throttling retries. See {@code reconnectThrottlingPeriod}. */
     private int reconnectThrottlingRetries = 3;
 
+    /** Retry limit. */
+    private int retryLimit = 0;
+
     /**
      * @return Host addresses.
      */
     public String[] getAddresses() {
-        return addrs;
+        return Optional.ofNullable(addrFinder)
+            .map(Supplier::get)
+            .orElse(addrs);
     }
 
     /**
      * @param addrs Host addresses.
      */
     public ClientConfiguration setAddresses(String... addrs) {
-        this.addrs = addrs;
+        if (addrs != null) {

Review comment:
       Same here - let's revert and move the logic to `ReliableChannel`.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
##########
@@ -115,18 +125,35 @@
     /** Reconnect throttling retries. See {@code reconnectThrottlingPeriod}. */
     private int reconnectThrottlingRetries = 3;
 
+    /** Retry limit. */
+    private int retryLimit = 0;
+
     /**
      * @return Host addresses.
      */
     public String[] getAddresses() {
-        return addrs;
+        return Optional.ofNullable(addrFinder)
+            .map(Supplier::get)
+            .orElse(addrs);
     }
 
     /**
      * @param addrs Host addresses.
      */
     public ClientConfiguration setAddresses(String... addrs) {
-        this.addrs = addrs;
+        if (addrs != null) {
+            this.addrs = Arrays.copyOf(addrs, addrs.length);
+            addrFinder = () -> this.addrs;
+        }
+
+        return this;
+    }
+
+    /**
+     * @param finder function that finds node addresses
+     */
+    public ClientConfiguration setAddressesFinder(Supplier<String[]> finder) {

Review comment:
       Corresponding getter is missing.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -169,8 +167,10 @@
             // No-op.
         }
 
-        for (ClientChannelHolder hld : channels)
-            hld.closeChannel();
+        if (channels.get() != null) {
+            for (ClientChannelHolder hld: channels.get())

Review comment:
       Here and below: race condition - `channels` may become null after the 
null check. Copy to a variable, then check for null / iterate / etc.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -62,16 +68,19 @@
     private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory;
 
     /** Client channel holders for each configured address. */
-    private final ClientChannelHolder[] channels;
+    private final AtomicReference<List<ClientChannelHolder>> channels = new 
AtomicReference<>();
 
     /** Index of the current channel. */
-    private int curChIdx;
+    private volatile int curChIdx = -1;
 
     /** Partition awareness enabled. */
     private final boolean partitionAwarenessEnabled;
 
     /** Cache partition awareness context. */
-    private final ClientCacheAffinityContext affinityCtx;
+    private final ClientCacheAffinityContext affCtx;

Review comment:
       Please revert unnecessary renames to reduce the scope of changes.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
##########
@@ -114,6 +114,7 @@ private TcpIgniteClient(ClientConfiguration cfg) throws 
ClientException {
         binary = new ClientBinary(marsh);
 
         ch = new ReliableChannel(chFactory, cfg, binary);
+        ch.initConnection();

Review comment:
       Is there a reason to bring this outside of the constructor?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -62,16 +68,19 @@
     private final Function<ClientChannelConfiguration, ClientChannel> 
chFactory;
 
     /** Client channel holders for each configured address. */
-    private final ClientChannelHolder[] channels;
+    private final AtomicReference<List<ClientChannelHolder>> channels = new 
AtomicReference<>();

Review comment:
       `AtomicReference` is not needed, we can just use `volatile` since only 
`get()` and `set()` methods are used below.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
##########
@@ -493,6 +520,25 @@ public ClientConfiguration 
setReconnectThrottlingRetries(int reconnectThrottling
         return this;
     }
 
+    /**
+     * Get retry limit.
+     */
+    public int getRetryLimit() {
+        return retryLimit;
+    }
+
+    /**
+     * Try use limited number of channels to send a request if default channel 
is not responding.

Review comment:
       ```
   Sets the retry limit. When a request fails due to a connection error, and 
multiple server connections are available, Ignite will retry the request on 
every connection. When this property is greater than zero, Ignite will limit 
the number of retries.
   ```

##########
File path: 
modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
##########
@@ -115,18 +125,35 @@
     /** Reconnect throttling retries. See {@code reconnectThrottlingPeriod}. */
     private int reconnectThrottlingRetries = 3;
 
+    /** Retry limit. */
+    private int retryLimit = 0;
+
     /**
      * @return Host addresses.
      */
     public String[] getAddresses() {
-        return addrs;
+        return Optional.ofNullable(addrFinder)

Review comment:
       Let's revert - no need to be clever here. `ClientConfiguration` should 
be a simple POJO, the logic for address handling should not be here.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -381,78 +343,59 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
                 .rangeClosed(r.portFrom(), r.portTo()).boxed()
                 .map(p -> new InetSocketAddress(r.host(), p))
             )
-            .collect(Collectors.toList());
-    }
-
-    /** */
-    private synchronized ClientChannel channel() {
-        if (closed)
-            throw new ClientException("Channel is closed");
-
-        try {
-            return channels[curChIdx].getOrCreateChannel();
-        }
-        catch (ClientConnectionException e) {
-            rollCurrentChannel();
-
-            throw e;
-        }
-    }
-
-    /** */
-    private synchronized void rollCurrentChannel() {
-        if (++curChIdx >= channels.length)
-            curChIdx = 0;
+            .collect(Collectors.toSet());
     }
 
     /**
-     * On current channel failure.
+     * Roll current default channel if specified holder equals to it.
      */
-    private synchronized void onChannelFailure(ClientChannel ch) {
-        // There is nothing wrong if curChIdx was concurrently changed, since 
channel was closed by another thread
-        // when current index was changed and no other wrong channel will be 
closed by current thread because
-        // onChannelFailure checks channel binded to the holder before closing 
it.
-        onChannelFailure(channels[curChIdx], ch);
-
-        chFailLsnrs.forEach(Runnable::run);
+    private void rollCurrentChannel(ClientChannelHolder hld) {
+        curChannelsGuard.writeLock().lock();
+        try {
+            ClientChannelHolder dfltHld = channels.get().get(curChIdx);
+            if (dfltHld == hld) {
+                int idx = curChIdx + 1;
+                if (idx >= channels.get().size())
+                    curChIdx = 0;
+                else
+                    curChIdx = idx;
+            }
+        } finally {
+            curChannelsGuard.writeLock().unlock();
+        }
     }
 
     /**
      * On channel of the specified holder failure.
      */
-    private synchronized void onChannelFailure(ClientChannelHolder hld, 
ClientChannel ch) {
-        if (ch == hld.ch && ch != null) {
+    private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+        if (hld != null && ch != null && ch == hld.ch)
             hld.closeChannel();
 
-            if (hld == channels[curChIdx])
-                rollCurrentChannel();
-        }
+        rollCurrentChannel(hld);
+
+        chFailLsnrs.forEach(Runnable::run);
     }
 
     /**
      * Asynchronously try to establish a connection to all configured servers.
      */
-    private void initAllChannelsAsync() {
-        // Skip if there is already channels reinit scheduled.
-        if (scheduledChannelsReinit.compareAndSet(false, true)) {
-            asyncRunner.submit(
-                () -> {
-                    scheduledChannelsReinit.set(false);
-
-                    for (ClientChannelHolder hld : channels) {
-                        if (scheduledChannelsReinit.get() || closed)
-                            return; // New reinit task scheduled or channel is 
closed.
-
-                        try {
-                            hld.getOrCreateChannel(true);
-                        }
-                        catch (Exception ignore) {
-                            // No-op.
-                        }
+    void initAllChannelsAsync() {

Review comment:
       `private`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {

Review comment:
       `shouldStopChannelsReinit()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);

Review comment:
       This requires iteration over entire map. Instead, use `addrs.entrySet()` 
for looping and remove by key.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,6 +415,196 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean stopInitCondition() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    private synchronized void initChannelHolders(boolean force) {
+        // enable parallel threads to schedule new init of channel holders
+        scheduledChannelsReinit.set(false);
+
+        if (!force && channels.get() != null)
+            return;
+
+        Set<InetSocketAddress> resolvedAddrs = 
parseAddresses(clientCfg.getAddresses());
+
+        List<ClientChannelHolder> holders = 
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+        // addr -> (holder, delete)
+        Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs = 
holders.stream()
+            .collect(Collectors.toMap(
+                c -> c.chCfg.getAddress(),
+                c -> new T2<>(c, null)
+        ));
+
+        // mark for delete addrs that aren't provided by clientConfig now
+        addrs.keySet()
+            .stream()
+            .filter(addr -> !resolvedAddrs.contains(addr))
+            .forEach(addr -> addrs.get(addr).setValue(true));
+
+        // create new holders for new addrs
+        resolvedAddrs.stream()
+            .filter(addr -> !addrs.containsKey(addr))
+            .forEach(addr -> {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                addrs.put(addr, new T2<>(hld, false));
+            });
+
+        if (!stopInitCondition()) {
+            List<ClientChannelHolder> list = new ArrayList<>();
+            // The variable holds a new index of default channel after 
topology change.
+            // Suppose that reuse of the channel is better than open new 
connection.
+            int dfltChannelIdx = -1;
+
+            ClientChannelHolder currHolder = null;
+            if (curChIdx != -1)
+                currHolder = channels.get().get(curChIdx);
+
+            for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+                ClientChannelHolder hld = t.get1();
+                Boolean markForDelete = t.get2();
+
+                if (markForDelete == null) {
+                    // this channel is still in use
+                    list.add(hld);
+                    if (hld == currHolder)
+                        dfltChannelIdx = list.size() - 1;
+
+                }
+                else if (markForDelete) {
+                    // this holder should be deleted now
+                    nodeChannels.values().remove(hld);
+                    hld.close();
+                }
+                else {
+                    // this channel is new
+                    list.add(hld);
+                }
+            }
+
+            if (dfltChannelIdx == -1)
+                dfltChannelIdx = new Random().nextInt(list.size());
+
+            curChannelsGuard.writeLock().lock();
+            try {
+                channels.set(list);
+                curChIdx = dfltChannelIdx;
+            } finally {
+                curChannelsGuard.writeLock().unlock();
+            }
+        }
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {
+        if (!force && channels.get() != null)
+            return;
+
+        // Skip if there is already channels reinit scheduled.
+        // Flag is set back when a thread comes in synchronized 
initChannelHolders
+        if (scheduledChannelsReinit.compareAndSet(false, true)) {
+            initChannelHolders(force);
+
+            if (partitionAwarenessEnabled)
+                initAllChannelsAsync();
+        }
+    }
+
+    /**
+     * Apply specified {@code function} on a channel corresponding to 
specified {@code nodeId}.
+     */
+    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> 
function) {
+        ClientChannelHolder hld = null;
+        ClientChannel channel = null;
+
+        try {
+            hld = nodeChannels.get(nodeId);
+
+            channel = Optional

Review comment:
       Too clever, verbose, and memory-hungry. Just make it `channel = hld == 
null ? null : hld.getOrCreateChannel();`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to