timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r505255508
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -606,16 +603,271 @@ public void addChannelFailListener(Runnable chFailLsnr) {
chFailLsnrs.add(chFailLsnr);
}
+ /**
+ * Should the channel initialization be stopped.
+ */
+ private boolean shouldStopChannelsReinit() {
+ return scheduledChannelsReinit.get() || closed;
+ }
+
+ /**
+ * Init channel holders to all nodes.
+ * @param force enable to replace existing channels with new holders.
+ * @return {@code true} if holders are reinited and {@code false} if the
initialization was interrupted.
+ */
+ synchronized boolean initChannelHolders(boolean force) {
+ List<ClientChannelHolder> holders = channels;
+
+ if (!force && holders != null)
+ return true;
+
+ startChannelsReInit = System.currentTimeMillis();
+
+ // Enable parallel threads to schedule new init of channel holders.
+ scheduledChannelsReinit.set(false);
+
+ Map<InetSocketAddress, Integer> newAddrs = null;
+
+ if (clientCfg.getAddressesFinder() != null) {
+ String[] hostAddrs = clientCfg.getAddressesFinder().getAddresses();
+
+ if (hostAddrs.length == 0)
+ throw new ClientException("Empty addresses");
+
+ if (!Arrays.equals(hostAddrs, prevHostAddrs)) {
+ newAddrs = parsedAddresses(hostAddrs);
+ prevHostAddrs = hostAddrs;
+ }
+ } else if (holders == null)
+ newAddrs = parsedAddresses(clientCfg.getAddresses());
+
+ if (newAddrs == null) {
+ finishChannelsReInit = System.currentTimeMillis();
+ return true;
+ }
+
+ Map<InetSocketAddress, ClientChannelHolder> curAddrs = new HashMap<>();
+ Set<InetSocketAddress> allAddrs = new HashSet<>(newAddrs.keySet());
+
+ if (holders != null) {
+ for (int i = 0; i < holders.size(); i++) {
+ ClientChannelHolder h = holders.get(i);
+
+ curAddrs.put(h.chCfg.getAddress(), h);
+ allAddrs.add(h.chCfg.getAddress());
+ }
+ }
+
+ List<ClientChannelHolder> reinitHolders = new ArrayList<>();
+
+ // The variable holds a new index of default channel after topology
change.
+ // Suppose that reuse of the channel is better than open new
connection.
+ int dfltChannelIdx = -1;
+
+ ClientChannelHolder currDfltHolder = null;
+
+ int idx = curChIdx;
+
+ if (idx != -1)
+ currDfltHolder = holders.get(idx);
+
+ for (InetSocketAddress addr : allAddrs) {
+ if (shouldStopChannelsReinit())
+ return false;
+
+ // Obsolete addr, to be removed.
+ if (!newAddrs.containsKey(addr)) {
+ curAddrs.get(addr).close();
+
+ continue;
+ }
+
+ // Create new holders for new addrs.
+ if (!curAddrs.containsKey(addr)) {
+ ClientChannelHolder hld = new ClientChannelHolder(new
ClientChannelConfiguration(clientCfg, addr));
+
+ for (int i = 0; i < newAddrs.get(addr); i++)
+ reinitHolders.add(hld);
+
+ continue;
+ }
+
+ // This holder is up to date.
+ ClientChannelHolder hld = curAddrs.get(addr);
+
+ for (int i = 0; i < newAddrs.get(addr); i++)
+ reinitHolders.add(hld);
+
+ if (hld == currDfltHolder)
+ dfltChannelIdx = reinitHolders.size() - 1;
+ }
+
+ if (dfltChannelIdx == -1)
+ dfltChannelIdx = new Random().nextInt(reinitHolders.size());
+
+ curChannelsGuard.writeLock().lock();
+ try {
+ channels = reinitHolders;
+ curChIdx = dfltChannelIdx;
+ }
+ finally {
+ curChannelsGuard.writeLock().unlock();
+ }
+
+ finishChannelsReInit = System.currentTimeMillis();
+ return true;
+ }
+
+ /**
+ * Establishing connections to servers. If partition awareness feature is
enabled connections are created
+ * for every configured server. Otherwise only default channel is
connected.
+ */
+ void channelsInit(boolean force) {
+ if (!force && channels != null)
Review comment:
fixed
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -512,80 +504,81 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
return ranges.stream()
.flatMap(r -> IntStream
.rangeClosed(r.portFrom(), r.portTo()).boxed()
- .map(p -> new InetSocketAddress(r.host(), p))
+ .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
)
- .collect(Collectors.toList());
+ .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
}
- /** */
- private synchronized ClientChannel channel() {
- if (closed)
- throw new ClientException("Channel is closed");
-
+ /**
+ * Roll current default channel if specified holder equals to it.
+ */
+ private void rollCurrentChannel(ClientChannelHolder hld) {
+ curChannelsGuard.writeLock().lock();
try {
- return channels[curChIdx].getOrCreateChannel();
- }
- catch (ClientConnectionException e) {
- rollCurrentChannel();
+ int idx = curChIdx;
+ List<ClientChannelHolder> holders = channels;
- throw e;
- }
- }
+ ClientChannelHolder dfltHld = holders.get(idx);
- /** */
- private synchronized void rollCurrentChannel() {
- if (++curChIdx >= channels.length)
- curChIdx = 0;
+ if (dfltHld == hld) {
+ idx += 1;
+
+ if (idx >= holders.size())
+ curChIdx = 0;
+ else
+ curChIdx = idx;
+ }
+ } finally {
+ curChannelsGuard.writeLock().unlock();
+ }
}
/**
* On current channel failure.
*/
- private synchronized void onChannelFailure(ClientChannel ch) {
+ private void onChannelFailure(ClientChannel ch) {
// There is nothing wrong if curChIdx was concurrently changed, since
channel was closed by another thread
// when current index was changed and no other wrong channel will be
closed by current thread because
// onChannelFailure checks channel binded to the holder before closing
it.
- onChannelFailure(channels[curChIdx], ch);
-
- chFailLsnrs.forEach(Runnable::run);
+ onChannelFailure(channels.get(curChIdx), ch);
}
/**
* On channel of the specified holder failure.
*/
- private synchronized void onChannelFailure(ClientChannelHolder hld,
ClientChannel ch) {
- if (ch == hld.ch && ch != null) {
+ private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+ if (ch != null && ch == hld.ch)
hld.closeChannel();
- if (hld == channels[curChIdx])
- rollCurrentChannel();
- }
+ chFailLsnrs.forEach(Runnable::run);
+
+ if (scheduledChannelsReinit.get())
Review comment:
fixed
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]