ptupitsyn commented on a change in pull request #8206:
URL: https://github.com/apache/ignite/pull/8206#discussion_r482730378
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -62,16 +65,16 @@
private final Function<ClientChannelConfiguration, ClientChannel>
chFactory;
/** Client channel holders for each configured address. */
- private final ClientChannelHolder[] channels;
+ private final AtomicReference<List<ClientChannelHolder>> channels = new
AtomicReference<>();
/** Index of the current channel. */
- private int curChIdx;
+ private volatile int curChIdx = -1;
- /** Partition awareness enabled. */
- private final boolean partitionAwarenessEnabled;
+ /** Is all channels should be initialized at one moment. */
Review comment:
`Is` -> `Whether`
##########
File path:
modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
##########
@@ -115,18 +116,33 @@
/** Reconnect throttling retries. See {@code reconnectThrottlingPeriod}. */
private int reconnectThrottlingRetries = 3;
+ /**
+ * Try use other limited number of channels to send a request if default
channel is not responding.
Review comment:
Please move this explanation to public setter/getter
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelHolder.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Channels holder.
+ */
+class ClientChannelHolder {
+ /** Channel configuration. */
+ final ClientChannelConfiguration chCfg;
+
+ /** Channel. */
+ private volatile ClientChannel ch;
+
+ /** Address that holder is bind to (chCfg.addr) is not in use now. So
close the holder */
+ private volatile boolean close;
+
+ /** Timestamps of reconnect retries. */
+ private final long[] reconnectRetries;
+
+ /** Channel factory. */
+ private final Function<ClientChannelConfiguration, ClientChannel>
chFactory;
+
+ /** Callback invokes when new channel create */
+ private final BiConsumer<ClientChannelHolder, ClientChannel>
onChannelCreate;
+
+ /** Callback invokes when channel close */
+ private final Consumer<ClientChannel> onChannelClose;
+
+ /**
+ * @param chCfg Channel config.
+ */
+ ClientChannelHolder(ClientChannelConfiguration chCfg,
+ Function<ClientChannelConfiguration, ClientChannel>
chFactory,
+ BiConsumer<ClientChannelHolder, ClientChannel>
onChannelCreate,
+ Consumer<ClientChannel> onChannelClose) {
+ this.chCfg = chCfg;
+ this.chFactory = chFactory;
+ this.onChannelCreate = onChannelCreate;
+ this.onChannelClose = onChannelClose;
+
+ reconnectRetries = chCfg.getReconnectThrottlingRetries() > 0 &&
chCfg.getReconnectThrottlingPeriod() > 0L ?
+ new long[chCfg.getReconnectThrottlingRetries()] : null;
+ }
+
+ /**
+ * @return Whether reconnect throttling should be applied.
+ */
+ boolean applyReconnectionThrottling() {
Review comment:
`shouldApplyReconnectionThrottling`
##########
File path:
modules/kubernetes/src/main/java/org/apache/ignite/kubernetes/KubernetesConnectorConfigurator.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.kubernetes;
+
+/**
+ * Interface provide methods to configure Kubernetes connection.
+ */
+public interface KubernetesConnectorConfigurator {
Review comment:
I think we should remove this interface. There is hardly any value in
sharing an interface across thin and thick configurations, but it can cause
difficulties when APIs evolve in the future.
##########
File path:
modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/ThinClientKubernetesAddressFinder.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes;
+
+import java.net.InetAddress;
+import java.util.function.Supplier;
+import org.apache.ignite.kubernetes.KubernetesConnectorConfigurator;
+import org.apache.ignite.kubernetes.KubernetesConnectorDefaults;
+import org.apache.ignite.kubernetes.KubernetesServiceAddressResolver;
+
+/**
+ * Address finder for automatic lookup of Ignite nodes running in Kubernetes
environment. All Ignite nodes have to
+ * deployed as Kubernetes pods in order to be found. Applications and Ignite
nodes running outside of Kubernetes
+ * will not be able to reach the containerized counterparts.
+ * <p>
+ * The implementation is based on a distinct Kubernetes service that has to be
created and should be deployed prior
+ * Ignite nodes startup. The service will maintain a list of all endpoints
(internal IP addresses) of all containerized
+ * Ignite pods running so far. The name of the service must be equal to {@link
#setServiceName(String)} which is
+ * `ignite` by default.
+ * <p>
+ * As for Ignite pods, it's recommended to label them in such a way that the
service will use the label in its selector
Review comment:
This paragraph is also confusing. We don't care how users define their
services and use their selectors, why mention that?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
- /** Affinity map update is in progress. */
- private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
/** Channel is closed. */
private volatile boolean closed;
/** Fail (disconnect) listeners. */
- private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+ private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
- /**
- * Constructor.
- */
- ReliableChannel(
- Function<ClientChannelConfiguration, ClientChannel> chFactory,
- ClientConfiguration clientCfg,
- IgniteBinary binary
- ) throws ClientException {
+ /** Fail (disconnect) listeners. */
+ private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new
ArrayList<>();
+
+ /** Guard channels and curChIdx together. */
+ private final ReadWriteLock curChannelsGuard = new
ReentrantReadWriteLock();
+
+ /** Constructor. */
+ ReliableChannel(ClientConfiguration clientCfg,
+ Function<ClientChannelConfiguration, ClientChannel>
chFactory,
+ boolean initAllChannels) {
if (chFactory == null)
throw new NullPointerException("chFactory");
if (clientCfg == null)
throw new NullPointerException("clientCfg");
+ this.clientCfg = clientCfg;
this.chFactory = chFactory;
+ this.initAllChannels = initAllChannels;
+ }
+
+ /** Should the channel initialization be stopped. */
+ private boolean stopInitCondition() {
+ return scheduledChannelsReinit.get() || closed;
+ }
- List<InetSocketAddress> addrs =
parseAddresses(clientCfg.getAddresses());
+ /** Callback is invoked after new ClientChannel has created. */
+ private final BiConsumer<ClientChannelHolder, ClientChannel>
onChannelCreate = (holder, ch) -> {
+ ch.addTopologyChangeListener(channel -> {
+ if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+ channelsInit(true);
+ });
- channels = new ClientChannelHolder[addrs.size()];
+ ch.addNotificationListener(this);
- for (int i = 0; i < channels.length; i++)
- channels[i] = new ClientChannelHolder(new
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+ nodeChannels.values().remove(holder);
+ nodeChannels.put(ch.serverNodeId(), holder);
+ };
- curChIdx = new Random().nextInt(channels.length); // We already
verified there is at least one address.
+ /** Callback is invoked after a ClientChannel has closed. */
+ private final Consumer<ClientChannel> onChannelClose = ch -> {
+ for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+ lsnr.accept(ch);
+ };
- partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() &&
channels.length > 1;
+ /**
+ * Init channel holders to all nodes.
+ * @param force enable to replace existing channels with new holders.
+ */
+ private synchronized void initChannelHolders(boolean force) {
+ // enable parallel threads to schedule new init of channel holders
+ scheduledChannelsReinit.set(false);
+
+ if (!force && channels.get() != null)
+ return;
+
+ List<InetSocketAddress> resolvedAddrs =
parseAddresses(clientCfg.getAddresses());
+
+ List<ClientChannelHolder> holders =
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+ // addr -> (holder, delete)
+ Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs =
holders.stream()
+ .collect(Collectors.toMap(
+ c -> c.chCfg.getAddress(),
+ c -> new T2<>(c, null)
+ ));
+
+ // mark for delete addrs that aren't provided by clientConfig now
+ addrs.keySet()
+ .stream()
+ .filter(addr -> !resolvedAddrs.contains(addr))
+ .forEach(addr -> addrs.get(addr).setValue(true));
+
+ // create new holders for new addrs
+ resolvedAddrs.stream()
+ .filter(addr -> !addrs.containsKey(addr))
+ .forEach(addr -> {
+ ClientChannelHolder hld = new ClientChannelHolder(
+ new ClientChannelConfiguration(clientCfg, addr),
chFactory, onChannelCreate, onChannelClose);
+
+ addrs.put(addr, new T2<>(hld, false));
+ });
+
+ if (!stopInitCondition()) {
+ List<ClientChannelHolder> list = new ArrayList<>();
+ // The variable holds a new index of default channel after
topology change.
+ // Suppose that reuse of the channel is better than open new
connection.
+ int dfltChannelIdx = -1;
+
+ ClientChannelHolder currHolder = null;
+ if (curChIdx != -1)
+ currHolder = channels.get().get(curChIdx);
+
+ for (T2<ClientChannelHolder, Boolean> t : addrs.values()) {
+ ClientChannelHolder hld = t.get1();
+ Boolean markForDelete = t.get2();
+
+ if (markForDelete == null) {
+ // this channel is still in use
+ list.add(hld);
+ if (hld == currHolder)
+ dfltChannelIdx = list.size() - 1;
- affinityCtx = new ClientCacheAffinityContext(binary);
+ }
+ else if (markForDelete) {
+ // this holder should be deleted now
+ nodeChannels.values().remove(hld);
+ hld.close();
+ }
+ else {
+ // this channel is new
+ list.add(hld);
+ }
+ }
- ClientConnectionException lastEx = null;
+ if (dfltChannelIdx == -1)
+ dfltChannelIdx = new Random().nextInt(list.size());
- for (int i = 0; i < channels.length; i++) {
+ curChannelsGuard.writeLock().lock();
Review comment:
Do we need a lock around AtomicReference usage?
##########
File path:
modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/ThinClientKubernetesAddressFinder.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes;
+
+import java.net.InetAddress;
+import java.util.function.Supplier;
+import org.apache.ignite.kubernetes.KubernetesConnectorConfigurator;
+import org.apache.ignite.kubernetes.KubernetesConnectorDefaults;
+import org.apache.ignite.kubernetes.KubernetesServiceAddressResolver;
+
+/**
+ * Address finder for automatic lookup of Ignite nodes running in Kubernetes
environment. All Ignite nodes have to
+ * deployed as Kubernetes pods in order to be found. Applications and Ignite
nodes running outside of Kubernetes
+ * will not be able to reach the containerized counterparts.
+ * <p>
+ * The implementation is based on a distinct Kubernetes service that has to be
created and should be deployed prior
+ * Ignite nodes startup. The service will maintain a list of all endpoints
(internal IP addresses) of all containerized
Review comment:
I know this was copied from existing `TcpDiscoveryKubernetesIpFinder`,
but the text is confusing:
* `prior Ignite nodes startup` - not true, k8s service can be created before
or after the pods - does not matter
* `service will maintain a list of all endpoints` - misleading, service is
an abstraction over a set of pods, it does not maintain endpoints, and this is
k8s implementation details
https://kubernetes.io/docs/concepts/services-networking/service/
##########
File path:
modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/ThinClientKubernetesAddressFinder.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes;
+
+import java.net.InetAddress;
+import java.util.function.Supplier;
+import org.apache.ignite.kubernetes.KubernetesConnectorConfigurator;
+import org.apache.ignite.kubernetes.KubernetesConnectorDefaults;
+import org.apache.ignite.kubernetes.KubernetesServiceAddressResolver;
+
+/**
+ * Address finder for automatic lookup of Ignite nodes running in Kubernetes
environment. All Ignite nodes have to
Review comment:
have to **be** deployed
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelHolder.java
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Channels holder.
+ */
+class ClientChannelHolder {
+ /** Channel configuration. */
+ final ClientChannelConfiguration chCfg;
Review comment:
`private`
##########
File path:
modules/kubernetes/src/main/java/org/apache/ignite/kubernetes/KubernetesServiceAddressResolver.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.kubernetes;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.SecureRandom;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * The class is responsible to fetch list of IP address for all pods that runs
the specified kubernetes service.
+ */
+public class KubernetesServiceAddressResolver {
Review comment:
I don't think this should be in the public package, users won't use this
class directly.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannelFacade.java
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.client.ClientAuthenticationException;
+import org.apache.ignite.client.ClientAuthorizationException;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.configuration.ClientConfiguration;
+
+/**
+ * Communication channel with failover and partition awareness.
+ */
+final class ReliableChannelFacade implements AutoCloseable {
Review comment:
Can you please explain the purpose of this class?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
- /** Affinity map update is in progress. */
- private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
/** Channel is closed. */
private volatile boolean closed;
/** Fail (disconnect) listeners. */
- private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+ private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
- /**
- * Constructor.
- */
- ReliableChannel(
- Function<ClientChannelConfiguration, ClientChannel> chFactory,
- ClientConfiguration clientCfg,
- IgniteBinary binary
- ) throws ClientException {
+ /** Fail (disconnect) listeners. */
+ private final ArrayList<Predicate<ClientChannel>> topChangeFilters = new
ArrayList<>();
+
+ /** Guard channels and curChIdx together. */
+ private final ReadWriteLock curChannelsGuard = new
ReentrantReadWriteLock();
+
+ /** Constructor. */
+ ReliableChannel(ClientConfiguration clientCfg,
+ Function<ClientChannelConfiguration, ClientChannel>
chFactory,
+ boolean initAllChannels) {
if (chFactory == null)
throw new NullPointerException("chFactory");
if (clientCfg == null)
throw new NullPointerException("clientCfg");
+ this.clientCfg = clientCfg;
this.chFactory = chFactory;
+ this.initAllChannels = initAllChannels;
+ }
+
+ /** Should the channel initialization be stopped. */
+ private boolean stopInitCondition() {
+ return scheduledChannelsReinit.get() || closed;
+ }
- List<InetSocketAddress> addrs =
parseAddresses(clientCfg.getAddresses());
+ /** Callback is invoked after new ClientChannel has created. */
+ private final BiConsumer<ClientChannelHolder, ClientChannel>
onChannelCreate = (holder, ch) -> {
+ ch.addTopologyChangeListener(channel -> {
+ if (topChangeFilters.stream().allMatch(s -> s.test(channel)))
+ channelsInit(true);
+ });
- channels = new ClientChannelHolder[addrs.size()];
+ ch.addNotificationListener(this);
- for (int i = 0; i < channels.length; i++)
- channels[i] = new ClientChannelHolder(new
ClientChannelConfiguration(clientCfg, addrs.get(i)));
+ nodeChannels.values().remove(holder);
+ nodeChannels.put(ch.serverNodeId(), holder);
+ };
- curChIdx = new Random().nextInt(channels.length); // We already
verified there is at least one address.
+ /** Callback is invoked after a ClientChannel has closed. */
+ private final Consumer<ClientChannel> onChannelClose = ch -> {
+ for (Consumer<ClientChannel> lsnr : channelCloseLsnrs)
+ lsnr.accept(ch);
+ };
- partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled() &&
channels.length > 1;
+ /**
+ * Init channel holders to all nodes.
+ * @param force enable to replace existing channels with new holders.
+ */
+ private synchronized void initChannelHolders(boolean force) {
+ // enable parallel threads to schedule new init of channel holders
+ scheduledChannelsReinit.set(false);
+
+ if (!force && channels.get() != null)
+ return;
+
+ List<InetSocketAddress> resolvedAddrs =
parseAddresses(clientCfg.getAddresses());
+
+ List<ClientChannelHolder> holders =
Optional.ofNullable(channels.get()).orElse(new ArrayList<>());
+
+ // addr -> (holder, delete)
+ Map<InetSocketAddress, T2<ClientChannelHolder, Boolean>> addrs =
holders.stream()
+ .collect(Collectors.toMap(
+ c -> c.chCfg.getAddress(),
+ c -> new T2<>(c, null)
+ ));
+
+ // mark for delete addrs that aren't provided by clientConfig now
+ addrs.keySet()
+ .stream()
+ .filter(addr -> !resolvedAddrs.contains(addr))
Review comment:
`resolvedAddrs` is a List, this can be inefficient with large number of
channels and addresses
##########
File path:
modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/ThinClientKubernetesAddressFinder.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes;
+
+import java.net.InetAddress;
+import java.util.function.Supplier;
+import org.apache.ignite.kubernetes.KubernetesConnectorConfigurator;
+import org.apache.ignite.kubernetes.KubernetesConnectorDefaults;
+import org.apache.ignite.kubernetes.KubernetesServiceAddressResolver;
+
+/**
+ * Address finder for automatic lookup of Ignite nodes running in Kubernetes
environment. All Ignite nodes have to
+ * deployed as Kubernetes pods in order to be found. Applications and Ignite
nodes running outside of Kubernetes
+ * will not be able to reach the containerized counterparts.
+ * <p>
+ * The implementation is based on a distinct Kubernetes service that has to be
created and should be deployed prior
+ * Ignite nodes startup. The service will maintain a list of all endpoints
(internal IP addresses) of all containerized
+ * Ignite pods running so far. The name of the service must be equal to {@link
#setServiceName(String)} which is
+ * `ignite` by default.
+ * <p>
+ * As for Ignite pods, it's recommended to label them in such a way that the
service will use the label in its selector
+ * configuration excluding endpoints of irrelevant Kubernetes pods running in
parallel.
+ * <p>
+ * The address finder, in its turn, will call this service to retrieve Ignite
pods IP addresses. The port will be
+ * set with {@link ReliableChannel#parseAddresses(String[])}. Make sure that
all Ignite pods occupy a similar
Review comment:
`ReliableChannel` is an implementation detail in the internal package,
let's not mention it in the javadoc of the public class.
##########
File path:
modules/kubernetes/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/kubernetes/ThinClientKubernetesAddressFinder.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.ipfinder.kubernetes;
Review comment:
I think this belongs to `org.apache.ignite.client` package
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
##########
@@ -98,265 +101,291 @@
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
- /** Affinity map update is in progress. */
- private final AtomicBoolean affinityUpdateInProgress = new AtomicBoolean();
-
/** Channel is closed. */
private volatile boolean closed;
/** Fail (disconnect) listeners. */
- private ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
+ private final ArrayList<Runnable> chFailLsnrs = new ArrayList<>();
- /**
- * Constructor.
- */
- ReliableChannel(
- Function<ClientChannelConfiguration, ClientChannel> chFactory,
- ClientConfiguration clientCfg,
- IgniteBinary binary
- ) throws ClientException {
+ /** Fail (disconnect) listeners. */
Review comment:
Copypasted javadoc
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]