timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r497489616



##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
+
+/**
+ * Test partition awareness of thin client on changed topology.
+ */
+public class ThinClientPartitionAwarenessDiscoveryTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that client use channels to all running nodes while new nodes start
+     */
+    @Test
+    public void testClientDiscoveryNodesJoin() throws Exception {
+        for (int i = 0; i < MAX_CLUSTER_SIZE; ++i) {
+            startGrid(i);
+            awaitPartitionMapExchange();
+
+            int[] workChannels = IntStream.rangeClosed(0, i).toArray();
+
+            if (i == 0)
+                initClient(getClientConfigurationWithDiscovery(), 
workChannels);
+            else {
+                detectTopologyChange();
+                awaitChannelsInit(workChannels);
+            }
+
+            testPartitionAwareness(workChannels);
+        }
+    }
+
+    /**
+     * Test that client use channels to all running nodes while nodes stop

Review comment:
       fixed.

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.client.ClientAddressFinder;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT;
+
+/**
+ * Test partition awareness of thin client on changed topology.
+ */
+public class ThinClientPartitionAwarenessDiscoveryTest extends 
ThinClientAbstractPartitionAwarenessTest {
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Test that client use channels to all running nodes while new nodes start

Review comment:
       fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -473,16 +429,230 @@ public void addChannelFailListener(Runnable chFailLsnr) {
         chFailLsnrs.add(chFailLsnr);
     }
 
+    /** Should the channel initialization be stopped. */
+    private boolean shouldStopChannelsReinit() {
+        return scheduledChannelsReinit.get() || closed;
+    }
+
+    /**
+     * Init channel holders to all nodes.
+     * @param force enable to replace existing channels with new holders.
+     */
+    synchronized void initChannelHolders(boolean force) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (!force && holders != null)
+            return;
+
+        startChannelsReInit = System.currentTimeMillis();
+
+        // Enable parallel threads to schedule new init of channel holders.
+        scheduledChannelsReinit.set(false);
+
+        Set<InetSocketAddress> newAddrs = null;
+
+        if (clientCfg.getAddressesFinder() != null) {
+            String[] hostAddrs = 
clientCfg.getAddressesFinder().getServerAddresses();
+
+            if (hostAddrs.length == 0)
+                throw new ClientException("Empty addresses");
+
+            if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+                newAddrs = parsedAddresses(hostAddrs);
+                prevHostAddrs = hostAddrs;
+            }
+        } else if (holders == null)
+            newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+        if (newAddrs == null) {
+            finishChannelsReInit = System.currentTimeMillis();
+            return;
+        }
+
+        Map<InetSocketAddress, ClientChannelHolder> curAddrs = 
Collections.emptyMap();
+        Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs);
+
+        if (holders != null) {
+            curAddrs = holders.stream()
+                .collect(Collectors.toMap(h -> h.chCfg.getAddress(), h -> h));
+
+            allAddrs.addAll(curAddrs.keySet());
+        }
+
+        List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+        // The variable holds a new index of default channel after topology 
change.
+        // Suppose that reuse of the channel is better than open new 
connection.
+        int dfltChannelIdx = -1;
+
+        ClientChannelHolder currDfltHolder = null;
+        int idx = curChIdx;
+        if (idx != -1)
+            currDfltHolder = holders.get(idx);
+
+        for (InetSocketAddress addr : allAddrs) {
+            if (shouldStopChannelsReinit())
+                return;
+
+            // Obsolete addr, to be removed.
+            if (!newAddrs.contains(addr)) {
+                curAddrs.get(addr).close();
+
+                continue;
+            }
+
+            // Create new holders for new addrs.
+            if (!curAddrs.containsKey(addr)) {
+                ClientChannelHolder hld = new ClientChannelHolder(new 
ClientChannelConfiguration(clientCfg, addr));
+                reinitHolders.add(hld);
+
+                continue;
+            }
+
+            // This holder is up to date.
+            ClientChannelHolder hld = curAddrs.get(addr);
+            reinitHolders.add(hld);
+            if (hld == currDfltHolder)
+                dfltChannelIdx = reinitHolders.size() - 1;
+        }
+
+        if (dfltChannelIdx == -1)
+            dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+        curChannelsGuard.writeLock().lock();
+        try {
+            channels = reinitHolders;
+            curChIdx = dfltChannelIdx;
+        }
+        finally {
+            curChannelsGuard.writeLock().unlock();
+        }
+
+        finishChannelsReInit = System.currentTimeMillis();
+    }
+
+    /** Initialization of channels. */
+    private void channelsInit(boolean force) {
+        if (!force && channels != null)
+            return;
+
+        // Skip if there is already channels reinit scheduled.
+        // Flag is set back when a thread comes in synchronized 
initChannelHolders.
+        if (scheduledChannelsReinit.compareAndSet(false, true)) {
+            initChannelHolders(force);
+
+            if (partitionAwarenessEnabled)
+                initAllChannelsAsync();
+        }
+    }
+
+    /**
+     * Apply specified {@code function} on a channel corresponding to 
specified {@code nodeId}.
+     */
+    private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> 
function) {
+        ClientChannelHolder hld = null;
+        ClientChannel channel = null;
+
+        try {
+            hld = nodeChannels.get(nodeId);
+
+            channel = hld != null ? hld.getOrCreateChannel() : null;
+
+            if (channel != null)
+                return function.apply(channel);
+
+        } catch (ClientConnectionException e) {
+            onChannelFailure(hld, channel);
+        }
+
+        return null;
+    }
+
+    /**
+     * Apply specified {@code function} on any of available channel.
+     */
+    private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) {
+        List<ClientChannelHolder> holders = channels;
+
+        if (holders == null)
+            throw new ClientException("Connections to nodes aren't 
initialized.");
+
+        int size = holders.size();
+
+        int attemptsLimit = clientCfg.getRetryLimit() > 0 ?
+            Math.min(clientCfg.getRetryLimit(), size) : size;
+
+        ClientConnectionException failure = null;
+
+        for (int attempt = 0; attempt < attemptsLimit; attempt++) {
+            ClientChannelHolder hld = null;
+            ClientChannel c = null;
+            try {
+                if (closed)
+                    throw new ClientException("Channel is closed");
+
+                curChannelsGuard.readLock().lock();
+                try {
+                    hld = channels.get(curChIdx);
+                } finally {
+                    curChannelsGuard.readLock().unlock();
+                }
+
+                c = hld.getOrCreateChannel();
+                if (c != null)
+                    return function.apply(c);
+            }
+            catch (ClientConnectionException e) {
+                if (failure == null)
+                    failure = e;
+                else
+                    failure.addSuppressed(e);
+
+                onChannelFailure(hld, c);
+            }
+        }
+
+        throw failure;
+    }
+
+    /**
+     * Try apply specified {@code function} on a channel corresponding to 
{@code tryNodeId}.
+     * If failed then apply the function on any available channel.
+     */
+    private <T> T apply(UUID tryNodeId, Function<ClientChannel, T> function) {

Review comment:
       fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -519,29 +689,44 @@ private boolean applyReconnectionThrottling() {
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel()
+        private ClientChannel getOrCreateChannel()
             throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
             return getOrCreateChannel(false);
         }
 
         /**
          * Get or create channel.
          */
-        private synchronized ClientChannel getOrCreateChannel(boolean 
ignoreThrottling)
+        private ClientChannel getOrCreateChannel(boolean ignoreThrottling)
             throws ClientConnectionException, ClientAuthenticationException, 
ClientProtocolError {
-            if (ch == null) {
-                if (!ignoreThrottling && applyReconnectionThrottling())
-                    throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
+            if (ch == null && !close) {
+                synchronized (this) {
+                    if (close)
+                        return null;
+
+                    if (ch != null)
+                        return ch;
 
-                ch = chFactory.apply(chCfg);
+                    if (!ignoreThrottling && applyReconnectionThrottling())
+                        throw new ClientConnectionException("Reconnect is not 
allowed due to applied throttling");
 
-                if (ch.serverNodeId() != null) {
-                    
ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
-                    ch.addNotificationListener(ReliableChannel.this);
+                    ClientChannel channel = chFactory.apply(chCfg);
 
-                    nodeChannels.values().remove(this);
+                    if (channel.serverNodeId() != null) {
+                        
channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
+                        channel.addNotificationListener(ReliableChannel.this);
 
-                    nodeChannels.putIfAbsent(ch.serverNodeId(), this);
+                        if (serverNodeId == null)

Review comment:
       fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to