timoninmaxim commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r482814669
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
- /** Affinity map update is in progress. */
- private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
/** Channel is closed. */
private volatile boolean closed;
/** Fail (disconnect) listeners. */
- private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+ private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
- /**
- * Constructor.
- */
- ReliableChannel(
- Function<ClientChannelConfiguration, ClientChannel> chFactory,
- ClientConfiguration clientCfg,
- IgniteBinary binary
- ) throws ClientException {
+ /** Fail (disconnect) listeners. */
+ private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new
ArrayList<>();
+
+ /** Guard channels and curChIdx together. */
+ private final ReadWriteLock curChannelsGuard = new
ReentrantReadWriteLock();
+
+ /** Constructor. */
+ ReliableChannel(ClientConfiguration clientCfg,
+ Function<ClientChannelConfiguration, ClientChannel>
chFactory,
+ boolean initAllChannels) {
if (chFactory == null)
throw new NullPointerException("chFactory");
if (clientCfg == null)
throw new NullPointerException("clientCfg");
+ this.clientCfg = clientCfg;
this.chFactory = chFactory;
+ this.initAllChannels = initAllChannels;
+ }
+
+ /** Should the channel initialization be stopped. */
+ private boolean stopInitCondition() {
+ return scheduledChannelsReinit.get() || closed;
+ }
- List<InetSocketAddress> addrs =
parseAddresses(clientCfg.getAddresses());
+ /** Callback is invoked after new ClientChannel has created. */
+ private final BiConsumer<ClientChannelHolder, ClientChannel>
onChannelCreate = (holder, ch) -> {
+ ch.addTopologyChangeListener(channel -> {
+ if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+ channelsInit(true);
+ });
- channels = new ClientChannelHolder[addrs.size()];
+ ch.addNotificationListener(this);
- for (int i = 0; i < channels.length; i++)
- channels[i] = new ClientChannelHolder(new
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+ nodeChannels.values().remove(holder);
+ nodeChannels.put(ch.serverNodeId(), holder);
+ };
- curChIdx = new Random().nextInt(channels.length); // We already
verified there is at least one address.
+ /** Callback is invoked after a ClientChannel has closed. */
+ private final Consumer<ClientChannel> onChannelClose = ch -> {
+ for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+ lsnr.accept(ch);
+ };
- partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() &&
channels.length > 1;
+ /**
+ * Init channel holders to all nodes.
+ * @param force enable to replace existing channels with new holders.
+ */
+ private synchronized void initChannelHolders(boolean force) {
+ // enable parallel threads to schedule new init of channel holders
+ scheduledChannelsReinit.set(false);
+
+ if (!force && channels.get() != null)
+ return;
+
+ List<InetSocketAddress> resolvedAddrs =
parseAddresses(clientCfg.getAddresses());
+
+ List<ClientChannelHolder> holders =
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+ // addr -> (holder, delete)
+ Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs =
holders.stream()
+ .collect(Collectors.toMap(
+ c -> c.chCfg.getAddress(),
+ c -> new T2<>(c, null)
+ ));
+
+ // mark for delete addrs that aren't provided by clientConfig now
+ addrs.keySet()
+ .stream()
+ .filter(addr -> !resolvedAddrs.contains(addr))
+ .forEach(addr -> addrs.get(addr).setValue(true));
+
+ // create new holders for new addrs
+ resolvedAddrs.stream()
+ .filter(addr -> !addrs.containsKey(addr))
+ .forEach(addr -> {
+ ClientChannelHolder hld = new ClientChannelHolder(
+ new ClientChannelConfiguration(clientCfg, addr),
chFactory, onChannelCreate, onChannelClose);
+
+ addrs.put(addr, new T2<>(hld, false));
+ });
+
+ if (!stopInitCondition()) {
+ List<ClientChannelHolder> list = new ArrayList<>();
+ // The variable holds a new index of default channel after
topology change.
+ // Suppose that reuse of the channel is better than open new
connection.
+ int dfltChannelIdx = -1;
+
+ ClientChannelHolder currHolder = null;
+ if (curChIdx != -1)
+ currHolder = channels.get().get(curChIdx);
+
+ for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+ ClientChannelHolder hld = t.get1();
+ Boolean markForDelete = t.get2();
+
+ if (markForDelete == null) {
+ // this channel is still in use
+ list.add(hld);
+ if (hld == currHolder)
+ dfltChannelIdx = list.size() - 1;
- affinityCtx = new ClientCacheAffinityContext(binary);
+ }
+ else if (markForDelete) {
+ // this holder should be deleted now
+ nodeChannels.values().remove(hld);
+ hld.close();
+ }
+ else {
+ // this channel is new
+ list.add(hld);
+ }
+ }
- ClientConnectionException lastEx = null;
+ if (dfltChannelIdx == -1)
+ dfltChannelIdx = new Random().nextInt(list.size());
- for (int i = 0; i < channels.length; i++) {
+ curChannelsGuard.writeLock().lock();
Review comment:
This lock is around two variables that must be in sync: channels
(AtomicReference) and curChIdx.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]