alex-plekhanov commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r504828714
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -512,80 +504,81 @@ private boolean affinityInfoIsUpToDate(int cacheId) {
return ranges.stream()
.flatMap(r -> IntStream
.rangeClosed(r.portFrom(), r.portTo()).boxed()
- .map(p -> new InetSocketAddress(r.host(), p))
+ .map(p -> InetSocketAddress.createUnresolved(r.host(), p))
)
- .collect(Collectors.toList());
+ .collect(Collectors.toMap(a -> a, a -> 1, Integer::sum));
}
- /** */
- private synchronized ClientChannel channel() {
- if (closed)
- throw new ClientException("Channel is closed");
-
+ /**
+ * Roll current default channel if specified holder equals to it.
+ */
+ private void rollCurrentChannel(ClientChannelHolder hld) {
+ curChannelsGuard.writeLock().lock();
try {
- return channels[curChIdx].getOrCreateChannel();
- }
- catch (ClientConnectionException e) {
- rollCurrentChannel();
+ int idx = curChIdx;
+ List<ClientChannelHolder> holders = channels;
- throw e;
- }
- }
+ ClientChannelHolder dfltHld = holders.get(idx);
- /** */
- private synchronized void rollCurrentChannel() {
- if (++curChIdx >= channels.length)
- curChIdx = 0;
+ if (dfltHld == hld) {
+ idx += 1;
+
+ if (idx >= holders.size())
+ curChIdx = 0;
+ else
+ curChIdx = idx;
+ }
+ } finally {
+ curChannelsGuard.writeLock().unlock();
+ }
}
/**
* On current channel failure.
*/
- private synchronized void onChannelFailure(ClientChannel ch) {
+ private void onChannelFailure(ClientChannel ch) {
// There is nothing wrong if curChIdx was concurrently changed, since
channel was closed by another thread
// when current index was changed and no other wrong channel will be
closed by current thread because
// onChannelFailure checks channel binded to the holder before closing
it.
- onChannelFailure(channels[curChIdx], ch);
-
- chFailLsnrs.forEach(Runnable::run);
+ onChannelFailure(channels.get(curChIdx), ch);
}
/**
* On channel of the specified holder failure.
*/
- private synchronized void onChannelFailure(ClientChannelHolder hld,
ClientChannel ch) {
- if (ch == hld.ch && ch != null) {
+ private void onChannelFailure(ClientChannelHolder hld, ClientChannel ch) {
+ if (ch != null && ch == hld.ch)
hld.closeChannel();
- if (hld == channels[curChIdx])
- rollCurrentChannel();
- }
+ chFailLsnrs.forEach(Runnable::run);
+
+ if (scheduledChannelsReinit.get())
+ channelsInit(true);
+ else
+ rollCurrentChannel(hld);
Review comment:
Also, if new default channel is assigned by `channelsInit()`, than
`rollCurrentChannel(hld)` will do no-op, since `hld` already not a default
channel.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]