timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r504924954
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -512,80 +504,81 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
return ranges.stream()
.flatMap(r -> IntStream
.rangeClosed(r.portFrom(), r.portTo()).boxed()
- .map(p -> new InetSocketAddress(r.host(), p))
+ .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
)
- .collect(Collectors.toList());
+ .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
}
- /** */
- private synchronized ClientChannel channel() {
- if (closed)
- throw new ClientException("Channel is closed");
-
+ /**
+ * Roll current default channel if specified holder equals to it.
+ */
+ private void rollCurrentChannel(ClientChannelHolder hld) {
+ curChannelsGuard.writeLock().lock();
try {
- return channels[curChIdx].getOrCreateChannel();
- }
- catch (ClientConnectionException e) {
- rollCurrentChannel();
+ int idx = curChIdx;
+ List<ClientChannelHolder> holders = channels;
- throw e;
- }
- }
+ ClientChannelHolder dfltHld = holders.get(idx);
- /** */
- private synchronized void rollCurrentChannel() {
- if (++curChIdx >= channels.length)
- curChIdx = 0;
+ if (dfltHld == hld) {
+ idx += 1;
+
+ if (idx >= holders.size())
+ curChIdx = 0;
+ else
+ curChIdx = idx;
+ }
+ } finally {
+ curChannelsGuard.writeLock().unlock();
+ }
}
/**
* On current channel failure.
*/
- private synchronized void onChannelFailure(ClientChannel ch) {
+ private void onChannelFailure(ClientChannel ch) {
// There is nothing wrong if curChIdx was concurrently changed, since
channel was closed by another thread
// when current index was changed and no other wrong channel will be
closed by current thread because
// onChannelFailure checks channel binded to the holder before closing
it.
- onChannelFailure(channels[curChIdx], ch);
-
- chFailLsnrs.forEach(Runnable::run);
+ onChannelFailure(channels.get(curChIdx), ch);
}
/**
* On channel of the specified holder failure.
*/
- private synchronized void onChannelFailure(ClientChannelHolder hld,
ClientChannel ch) {
- if (ch == hld.ch && ch != null) {
+ private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+ if (ch != null && ch == hld.ch)
hld.closeChannel();
- if (hld == channels[curChIdx])
- rollCurrentChannel();
- }
+ chFailLsnrs.forEach(Runnable::run);
+
+ if (scheduledChannelsReinit.get())
+ channelsInit(true);
+ else
+ rollCurrentChannel(hld);
Review comment:
Actually there is an invokation sequence for default case: `channelsInit
-> applyOnDefaultChannel` to establish connection with default channel (that
checks that connection is OK). So rolling is not required too. Also rolling at
least acquires lock so it is not free.
So, there is a matrix of cases for channel failure:
1. Fixed addreses, topology changes -> Channel is rolled with `channelsInit
-> applyOnDefaultChannel`;
2. Fixed addresses, topology does not change -> `rollCurrentChannel`;
3. Dynamic address, topology changes -> channels are reinited twice, rolls
with `applyOnDefaultChannel`;
4. Dynamic address, topology does not change -> `rollCurrentChannel`.
So, there is an issue with double initialization for dynamic addresses only.
I will fix it by skipping second initialization as describe in other comment.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]