timoninmaxim commented on a change in pull request #8206: URL: https://github.com/apache/ignite/pull/8206#discussion_r497489616
########## File path: modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.client.ClientAddressFinder; +import org.apache.ignite.client.ClientCache; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT; + +/** + * Test partition awareness of thin client on changed topology. + */ +public class ThinClientPartitionAwarenessDiscoveryTest extends ThinClientAbstractPartitionAwarenessTest { + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Test that client use channels to all running nodes while new nodes start + */ + @Test + public void testClientDiscoveryNodesJoin() throws Exception { + for (int i = 0; i < MAX_CLUSTER_SIZE; ++i) { + startGrid(i); + awaitPartitionMapExchange(); + + int[] workChannels = IntStream.rangeClosed(0, i).toArray(); + + if (i == 0) + initClient(getClientConfigurationWithDiscovery(), workChannels); + else { + detectTopologyChange(); + awaitChannelsInit(workChannels); + } + + testPartitionAwareness(workChannels); + } + } + + /** + * Test that client use channels to all running nodes while nodes stop Review comment: fixed. ########## File path: modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessDiscoveryTest.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.thin; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.ignite.client.ClientAddressFinder; +import org.apache.ignite.client.ClientCache; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.internal.IgnitionEx; +import org.apache.ignite.internal.processors.cache.IgniteInternalCache; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.odbc.ClientListenerProcessor.CLIENT_LISTENER_PORT; + +/** + * Test partition awareness of thin client on changed topology. + */ +public class ThinClientPartitionAwarenessDiscoveryTest extends ThinClientAbstractPartitionAwarenessTest { + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Test that client use channels to all running nodes while new nodes start Review comment: fixed. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java ########## @@ -473,16 +429,230 @@ public void addChannelFailListener(Runnable chFailLsnr) { chFailLsnrs.add(chFailLsnr); } + /** Should the channel initialization be stopped. */ + private boolean shouldStopChannelsReinit() { + return scheduledChannelsReinit.get() || closed; + } + + /** + * Init channel holders to all nodes. + * @param force enable to replace existing channels with new holders. + */ + synchronized void initChannelHolders(boolean force) { + List<ClientChannelHolder> holders = channels; + + if (!force && holders != null) + return; + + startChannelsReInit = System.currentTimeMillis(); + + // Enable parallel threads to schedule new init of channel holders. + scheduledChannelsReinit.set(false); + + Set<InetSocketAddress> newAddrs = null; + + if (clientCfg.getAddressesFinder() != null) { + String[] hostAddrs = clientCfg.getAddressesFinder().getServerAddresses(); + + if (hostAddrs.length == 0) + throw new ClientException("Empty addresses"); + + if (!Arrays.equals(hostAddrs, prevHostAddrs)) { + newAddrs = parsedAddresses(hostAddrs); + prevHostAddrs = hostAddrs; + } + } else if (holders == null) + newAddrs = parsedAddresses(clientCfg.getAddresses()); + + if (newAddrs == null) { + finishChannelsReInit = System.currentTimeMillis(); + return; + } + + Map<InetSocketAddress, ClientChannelHolder> curAddrs = Collections.emptyMap(); + Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs); + + if (holders != null) { + curAddrs = holders.stream() + .collect(Collectors.toMap(h -> h.chCfg.getAddress(), h -> h)); + + allAddrs.addAll(curAddrs.keySet()); + } + + List<ClientChannelHolder> reinitHolders = new ArrayList<>(); + // The variable holds a new index of default channel after topology change. + // Suppose that reuse of the channel is better than open new connection. + int dfltChannelIdx = -1; + + ClientChannelHolder currDfltHolder = null; + int idx = curChIdx; + if (idx != -1) + currDfltHolder = holders.get(idx); + + for (InetSocketAddress addr : allAddrs) { + if (shouldStopChannelsReinit()) + return; + + // Obsolete addr, to be removed. + if (!newAddrs.contains(addr)) { + curAddrs.get(addr).close(); + + continue; + } + + // Create new holders for new addrs. + if (!curAddrs.containsKey(addr)) { + ClientChannelHolder hld = new ClientChannelHolder(new ClientChannelConfiguration(clientCfg, addr)); + reinitHolders.add(hld); + + continue; + } + + // This holder is up to date. + ClientChannelHolder hld = curAddrs.get(addr); + reinitHolders.add(hld); + if (hld == currDfltHolder) + dfltChannelIdx = reinitHolders.size() - 1; + } + + if (dfltChannelIdx == -1) + dfltChannelIdx = new Random().nextInt(reinitHolders.size()); + + curChannelsGuard.writeLock().lock(); + try { + channels = reinitHolders; + curChIdx = dfltChannelIdx; + } + finally { + curChannelsGuard.writeLock().unlock(); + } + + finishChannelsReInit = System.currentTimeMillis(); + } + + /** Initialization of channels. */ + private void channelsInit(boolean force) { + if (!force && channels != null) + return; + + // Skip if there is already channels reinit scheduled. + // Flag is set back when a thread comes in synchronized initChannelHolders. + if (scheduledChannelsReinit.compareAndSet(false, true)) { + initChannelHolders(force); + + if (partitionAwarenessEnabled) + initAllChannelsAsync(); + } + } + + /** + * Apply specified {@code function} on a channel corresponding to specified {@code nodeId}. + */ + private <T> T applyOnNodeChannel(UUID nodeId, Function<ClientChannel, T> function) { + ClientChannelHolder hld = null; + ClientChannel channel = null; + + try { + hld = nodeChannels.get(nodeId); + + channel = hld != null ? hld.getOrCreateChannel() : null; + + if (channel != null) + return function.apply(channel); + + } catch (ClientConnectionException e) { + onChannelFailure(hld, channel); + } + + return null; + } + + /** + * Apply specified {@code function} on any of available channel. + */ + private <T> T applyOnDefaultChannel(Function<ClientChannel, T> function) { + List<ClientChannelHolder> holders = channels; + + if (holders == null) + throw new ClientException("Connections to nodes aren't initialized."); + + int size = holders.size(); + + int attemptsLimit = clientCfg.getRetryLimit() > 0 ? + Math.min(clientCfg.getRetryLimit(), size) : size; + + ClientConnectionException failure = null; + + for (int attempt = 0; attempt < attemptsLimit; attempt++) { + ClientChannelHolder hld = null; + ClientChannel c = null; + try { + if (closed) + throw new ClientException("Channel is closed"); + + curChannelsGuard.readLock().lock(); + try { + hld = channels.get(curChIdx); + } finally { + curChannelsGuard.readLock().unlock(); + } + + c = hld.getOrCreateChannel(); + if (c != null) + return function.apply(c); + } + catch (ClientConnectionException e) { + if (failure == null) + failure = e; + else + failure.addSuppressed(e); + + onChannelFailure(hld, c); + } + } + + throw failure; + } + + /** + * Try apply specified {@code function} on a channel corresponding to {@code tryNodeId}. + * If failed then apply the function on any available channel. + */ + private <T> T apply(UUID tryNodeId, Function<ClientChannel, T> function) { Review comment: fixed. ########## File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java ########## @@ -519,29 +689,44 @@ private boolean applyReconnectionThrottling() { /** * Get or create channel. */ - private synchronized ClientChannel getOrCreateChannel() + private ClientChannel getOrCreateChannel() throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { return getOrCreateChannel(false); } /** * Get or create channel. */ - private synchronized ClientChannel getOrCreateChannel(boolean ignoreThrottling) + private ClientChannel getOrCreateChannel(boolean ignoreThrottling) throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError { - if (ch == null) { - if (!ignoreThrottling && applyReconnectionThrottling()) - throw new ClientConnectionException("Reconnect is not allowed due to applied throttling"); + if (ch == null && !close) { + synchronized (this) { + if (close) + return null; + + if (ch != null) + return ch; - ch = chFactory.apply(chCfg); + if (!ignoreThrottling && applyReconnectionThrottling()) + throw new ClientConnectionException("Reconnect is not allowed due to applied throttling"); - if (ch.serverNodeId() != null) { - ch.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged); - ch.addNotificationListener(ReliableChannel.this); + ClientChannel channel = chFactory.apply(chCfg); - nodeChannels.values().remove(this); + if (channel.serverNodeId() != null) { + channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged); + channel.addNotificationListener(ReliableChannel.this); - nodeChannels.putIfAbsent(ch.serverNodeId(), this); + if (serverNodeId == null) Review comment: fixed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
